From 5ba7b66e9e5e2519db11299d9d25b155603d578f Mon Sep 17 00:00:00 2001 From: pengkun0129 Date: Wed, 3 Jun 2026 02:04:10 +0000 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6=E8=87=B3?= =?UTF-8?q?=E3=80=8Cmxpic=5Frouter=E3=80=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mxpic_router/builder.py | 1226 ++++++++++++++++++++------------------- 1 file changed, 622 insertions(+), 604 deletions(-) diff --git a/mxpic_router/builder.py b/mxpic_router/builder.py index 18cc7ab..edf19d9 100644 --- a/mxpic_router/builder.py +++ b/mxpic_router/builder.py @@ -1,232 +1,234 @@ -import math -import os -import sys -from typing import Dict, Optional - -import yaml - -from .eda_loader import CellSpec, InstanceSpec, LinkSpec, load_cell_spec -from .technology import apply_technology_manifest, load_technology_manifest - - -def build_project_gds( - project_dir: str, - output_path: str, - pdk_root: str, - technology_manifest_path: Optional[str] = None, - prefer_full_gds: bool = False, - target_cell_name: Optional[str] = None, -) -> dict: - import nazca as nd - - Route = _import_route_backend(nd) - manifest = load_technology_manifest(technology_manifest_path) - apply_technology_manifest(manifest, nd) - - specs = _load_project_specs(project_dir) - if not specs: - raise ValueError("No saved cell YAML files found for this project") - - built_cells = {} - warnings = [] - for spec in _ordered_specs(specs): - built_cells[spec.name] = _build_cell(spec, built_cells, pdk_root, prefer_full_gds, Route, nd, warnings) - - top = _select_top_spec(specs, target_cell_name) - os.makedirs(os.path.dirname(output_path), exist_ok=True) - nd.export_gds(topcells=[built_cells[top.name]], filename=output_path) - return { - "output_path": output_path, - "engine": "mxpic_router", - "cells_built": [spec.name for spec in _ordered_specs(specs)], - "warnings": warnings, - } - - -def _load_project_specs(project_dir: str) -> Dict[str, CellSpec]: - specs = {} - for filename in sorted(os.listdir(project_dir)): - if not filename.lower().endswith((".yml", ".yaml")): - continue - spec = load_cell_spec(os.path.join(project_dir, filename)) - specs[spec.name] = spec - return specs - - -def _ordered_specs(specs: Dict[str, CellSpec]): - composites = [spec for spec in specs.values() if spec.type != "project"] - projects = [spec for spec in specs.values() if spec.type == "project"] - return composites + projects - - -def _select_top_spec(specs: Dict[str, CellSpec], target_cell_name: Optional[str]): - if target_cell_name: - if target_cell_name not in specs: - raise ValueError(f"Target cell not found for routed build: {target_cell_name}") - return specs[target_cell_name] - return _ordered_specs(specs)[-1] - - +import math +import os +import sys +from typing import Dict, Optional + +import yaml + +from .eda_loader import CellSpec, InstanceSpec, LinkSpec, load_cell_spec +from .technology import apply_technology_manifest, load_technology_manifest + + +def build_project_gds( + project_dir: str, + output_path: str, + pdk_root: str, + technology_manifest_path: Optional[str] = None, + prefer_full_gds: bool = False, + target_cell_name: Optional[str] = None, +) -> dict: + import nazca as nd + + Route = _import_route_backend(nd) + manifest = load_technology_manifest(technology_manifest_path) + apply_technology_manifest(manifest, nd) + + specs = _load_project_specs(project_dir) + if not specs: + raise ValueError("No saved cell YAML files found for this project") + + built_cells = {} + warnings = [] + for spec in _ordered_specs(specs): + built_cells[spec.name] = _build_cell(spec, built_cells, pdk_root, prefer_full_gds, Route, nd, warnings) + + top = _select_top_spec(specs, target_cell_name) + os.makedirs(os.path.dirname(output_path), exist_ok=True) + nd.export_gds(topcells=[built_cells[top.name]], filename=output_path) + return { + "output_path": output_path, + "engine": "mxpic_router", + "cells_built": [spec.name for spec in _ordered_specs(specs)], + "warnings": warnings, + } + + +def _load_project_specs(project_dir: str) -> Dict[str, CellSpec]: + specs = {} + for filename in sorted(os.listdir(project_dir)): + if not filename.lower().endswith((".yml", ".yaml")): + continue + spec = load_cell_spec(os.path.join(project_dir, filename)) + specs[spec.name] = spec + return specs + + +def _ordered_specs(specs: Dict[str, CellSpec]): + composites = [spec for spec in specs.values() if spec.type != "project"] + projects = [spec for spec in specs.values() if spec.type == "project"] + return composites + projects + + +def _select_top_spec(specs: Dict[str, CellSpec], target_cell_name: Optional[str]): + if target_cell_name: + if target_cell_name not in specs: + raise ValueError(f"Target cell not found for routed build: {target_cell_name}") + return specs[target_cell_name] + return _ordered_specs(specs)[-1] + + def _build_cell(spec: CellSpec, built_cells: dict, pdk_root: str, prefer_full_gds: bool, Route, nd, warnings: list): pin_map = {} with nd.Cell(name=spec.name) as top: for instance_name, instance in spec.instances.items(): + layout_rotation = _layout_rotation(instance.rotation) if _is_basic_component(instance.component): basic_cell = _build_basic_component(instance, nd) - placed = basic_cell.put(instance.x, instance.y, instance.rotation, flip=instance.flip, flop=instance.flop) + placed = basic_cell.put(instance.x, instance.y, layout_rotation, flip=instance.flip, flop=instance.flop) for pin_name in getattr(placed, "pin", {}): if pin_name != "org": pin_map[(instance_name, pin_name)] = placed.pin[pin_name] continue if instance.component in built_cells: - placed = built_cells[instance.component].put(instance.x, instance.y, instance.rotation, flip=instance.flip, flop=instance.flop) + placed = built_cells[instance.component].put(instance.x, instance.y, layout_rotation, flip=instance.flip, flop=instance.flop) for pin_name in getattr(placed, "pin", {}): if pin_name != "org": pin_map[(instance_name, pin_name)] = placed.pin[pin_name] continue - - asset = _resolve_pdk_asset(pdk_root, instance.component, prefer_full_gds) + + asset = _resolve_pdk_asset(pdk_root, instance.component, prefer_full_gds) if not asset.get("gds_path"): warnings.append(f"Missing GDS for {instance_name}: {instance.component}") continue loaded = nd.load_gds(asset["gds_path"]) - loaded.put(instance.x, instance.y, instance.rotation, flip=instance.flip, flop=instance.flop) + loaded.put(instance.x, instance.y, layout_rotation, flip=instance.flip, flop=instance.flop) _register_metadata_pins(pin_map, instance_name, instance, asset.get("metadata") or {}, nd, pdk_root) - - for pin_name, pin in spec.pins.items(): - pin_out = nd.Pin(pin_name, width=pin.width).put(pin.x, pin.y, pin.angle) - pin_in = pin_out.move(0, 0, 180) - pin_map[("this", pin_name)] = pin_in - pin_map[(pin_name, pin_name)] = pin_in - - for element_name, element in spec.elements.items(): - _register_element_pins(pin_map, element_name, element, nd) - - for bundle in spec.bundles.values(): - for link in bundle.links: - _route_link(link, pin_map, Route, warnings) - return top - - -def _is_basic_component(component: str) -> bool: - return component in {"waveguide", "90 bend", "180 bend", "cricle", "circle", "taper"} - - -def _build_basic_component(instance: InstanceSpec, nd): - settings = instance.settings or {} - component = "cricle" if instance.component == "circle" else instance.component - width = _safe_float(settings.get("width"), _safe_float(settings.get("width1"), 0.5)) or 0.5 - xs = settings.get("xsection") or settings.get("xs") or "strip" - radius = _safe_float(settings.get("radius"), 10) or 10 - length = _safe_float(settings.get("length"), 100) or 100 - name = f"basic_{component.replace(' ', '_')}_{_safe_cell_name(instance.name)}" - with nd.Cell(name=name, instantiate=False) as cell: - if component == "waveguide": - line = nd.strt(length=length, width=width, xs=xs).put(0, 0, 0) - nd.Pin("a1", width=width, xs=xs).put(line.pin["a0"]) - nd.Pin("b1", width=width, xs=xs).put(line.pin["b0"]) - elif component == "90 bend": - bend = nd.bend(radius=radius, width=width, angle=90, xs=xs).put(0, 0, 0) - nd.Pin("a1", width=width, xs=xs).put(bend.pin["a0"]) - nd.Pin("b1", width=width, xs=xs).put(bend.pin["b0"]) - elif component == "180 bend": - bend = nd.bend(radius=radius, width=width, angle=180, xs=xs).put(0, 0, 0) - nd.Pin("a1", width=width, xs=xs).put(bend.pin["a0"]) - nd.Pin("b1", width=width, xs=xs).put(bend.pin["b0"]) - elif component == "cricle": - bend = nd.bend(radius=radius, width=width, angle=360, xs=xs).put(0, 0, 0) - nd.Pin("a1", width=width, xs=xs).put(bend.pin["a0"]) - nd.Pin("b1", width=width, xs=xs).put(bend.pin["b0"]) - elif component == "taper": - width1 = _safe_float(settings.get("width1"), width) or width - width2 = _safe_float(settings.get("width2"), width) or width - taper = nd.taper(length=length, width1=width1, width2=width2, xs=xs).put(0, 0, 0) - nd.Pin("a1", width=width1, xs=xs).put(taper.pin["a0"]) - nd.Pin("b1", width=width2, xs=xs).put(taper.pin["b0"]) - return cell - - + + for pin_name, pin in spec.pins.items(): + pin_out = nd.Pin(pin_name, width=pin.width).put(pin.x, pin.y, pin.angle) + pin_in = pin_out.move(0, 0, 180) + pin_map[("this", pin_name)] = pin_in + pin_map[(pin_name, pin_name)] = pin_in + + for element_name, element in spec.elements.items(): + _register_element_pins(pin_map, element_name, element, nd) + + for bundle in spec.bundles.values(): + for link in bundle.links: + _route_link(link, pin_map, Route, warnings) + return top + + +def _is_basic_component(component: str) -> bool: + return component in {"waveguide", "90 bend", "180 bend", "cricle", "circle", "taper"} + + +def _build_basic_component(instance: InstanceSpec, nd): + settings = instance.settings or {} + component = "cricle" if instance.component == "circle" else instance.component + width = _safe_float(settings.get("width"), _safe_float(settings.get("width1"), 0.5)) or 0.5 + xs = settings.get("xsection") or settings.get("xs") or "strip" + radius = _safe_float(settings.get("radius"), 10) or 10 + length = _safe_float(settings.get("length"), 100) or 100 + name = f"basic_{component.replace(' ', '_')}_{_safe_cell_name(instance.name)}" + with nd.Cell(name=name, instantiate=False) as cell: + if component == "waveguide": + line = nd.strt(length=length, width=width, xs=xs).put(0, 0, 0) + nd.Pin("a1", width=width, xs=xs).put(line.pin["a0"]) + nd.Pin("b1", width=width, xs=xs).put(line.pin["b0"]) + elif component == "90 bend": + bend = nd.bend(radius=radius, width=width, angle=90, xs=xs).put(0, 0, 0) + nd.Pin("a1", width=width, xs=xs).put(bend.pin["a0"]) + nd.Pin("b1", width=width, xs=xs).put(bend.pin["b0"]) + elif component == "180 bend": + bend = nd.bend(radius=radius, width=width, angle=180, xs=xs).put(0, 0, 0) + nd.Pin("a1", width=width, xs=xs).put(bend.pin["a0"]) + nd.Pin("b1", width=width, xs=xs).put(bend.pin["b0"]) + elif component == "cricle": + bend = nd.bend(radius=radius, width=width, angle=360, xs=xs).put(0, 0, 0) + nd.Pin("a1", width=width, xs=xs).put(bend.pin["a0"]) + nd.Pin("b1", width=width, xs=xs).put(bend.pin["b0"]) + elif component == "taper": + width1 = _safe_float(settings.get("width1"), width) or width + width2 = _safe_float(settings.get("width2"), width) or width + taper = nd.taper(length=length, width1=width1, width2=width2, xs=xs).put(0, 0, 0) + nd.Pin("a1", width=width1, xs=xs).put(taper.pin["a0"]) + nd.Pin("b1", width=width2, xs=xs).put(taper.pin["b0"]) + return cell + + def _register_element_pins(pin_map: dict, element_name: str, element, nd) -> None: element_cell = _build_element_cell(element_name, element, nd) placed = element_cell.put(element.x, element.y, element.angle) - if element.type == "port": - for pin in _port_element_pin_entries(element_name, element): - pin_map[(element_name, pin["name"])] = placed.pin[f'{pin["name"]}_in'] - return - - for pin in _anchor_element_pin_entries(element_name, element): - pin_map[(element_name, pin["name"])] = placed.pin[pin["name"]] - - -def _build_element_cell(element_name: str, element, nd): - width = element.width or 0.5 - port_number = max(1, int(getattr(element, "port_number", 1) or 1)) - pitch = _safe_float(getattr(element, "pitch", 10.0), 10.0) or 10.0 - with nd.Cell(name=f"element_{_safe_cell_name(element_name)}", instantiate=False) as cell: - if element.type == "port": - for index, pin in enumerate(_port_element_pin_entries(element_name, element)): - local_y = 0.0 if port_number == 1 else _element_port_offset(index, port_number, pitch) - pin_out = nd.Pin(pin["name"], width=width).put(0.0, local_y, 180.0) - pin_in = pin_out.move(0, 0, 180) - nd.Pin(f'{pin["name"]}_in', width=width).put(pin_in.x, pin_in.y, pin_in.a) - return cell - - anchor_pins = _anchor_element_pin_entries(element_name, element) - for index in range(port_number): - local_y = -15.0 + _element_port_offset(index, port_number, pitch) - left_pin = anchor_pins[index * 2] - right_pin = anchor_pins[index * 2 + 1] - nd.Pin(left_pin["name"], width=width).put(0.0, local_y, 180.0) - nd.Pin(right_pin["name"], width=width).put(0.0, local_y, 0.0) - return cell - - -def _element_port_offset(index: int, count: int, pitch: float) -> float: - return ((count - 1) / 2 - index) * pitch - - -def _port_element_pin_entries(element_name: str, element) -> list: - count = max(1, int(getattr(element, "port_number", 1) or 1)) - defaults = [{"role": f"io{index + 1}", "name": f"{_safe_cell_name(element_name)}_io{index + 1}"} for index in range(count)] - return _named_pin_entries(defaults, getattr(element, "pins", None)) - - -def _anchor_element_pin_entries(element_name: str, element) -> list: - count = max(1, int(getattr(element, "port_number", 1) or 1)) - defaults = [] - for index in range(count): - number = index + 1 - defaults.append({"role": f"a{number}", "name": f"{_safe_cell_name(element_name)}_a{number}"}) - defaults.append({"role": f"b{number}", "name": f"{_safe_cell_name(element_name)}_b{number}"}) - return _named_pin_entries(defaults, getattr(element, "pins", None)) - - -def _named_pin_entries(defaults: list, overrides) -> list: - by_role = {str(pin.get("role") or ""): str(pin.get("name") or "") for pin in overrides or []} - by_index = [str(pin.get("name") or "") for pin in overrides or []] - entries = [] - for index, default in enumerate(defaults): - role = default["role"] - name = by_role.get(role) or (by_index[index] if index < len(by_index) else "") or default["name"] - entries.append({"role": role, "name": _safe_cell_name(name)}) - return entries - - -def _transform_element_port(local_x: float, local_y: float, angle: float, element) -> tuple: - rotation = math.radians(_safe_float(getattr(element, "angle", 0.0), 0.0) or 0.0) - cos_v = math.cos(rotation) - sin_v = math.sin(rotation) - x = element.x + local_x * cos_v - local_y * sin_v - y = element.y + local_x * sin_v + local_y * cos_v - return x, y, angle - - -def _safe_cell_name(value: str) -> str: - return "".join(ch if ch.isalnum() or ch == "_" else "_" for ch in str(value or "inst")) - - + if element.type == "port": + for pin in _port_element_pin_entries(element_name, element): + pin_map[(element_name, pin["name"])] = placed.pin[f'{pin["name"]}_in'] + return + + for pin in _anchor_element_pin_entries(element_name, element): + pin_map[(element_name, pin["name"])] = placed.pin[pin["name"]] + + +def _build_element_cell(element_name: str, element, nd): + width = element.width or 0.5 + port_number = max(1, int(getattr(element, "port_number", 1) or 1)) + pitch = _safe_float(getattr(element, "pitch", 10.0), 10.0) or 10.0 + with nd.Cell(name=f"element_{_safe_cell_name(element_name)}", instantiate=False) as cell: + if element.type == "port": + for index, pin in enumerate(_port_element_pin_entries(element_name, element)): + local_y = 0.0 if port_number == 1 else _element_port_offset(index, port_number, pitch) + pin_out = nd.Pin(pin["name"], width=width).put(0.0, local_y, 180.0) + pin_in = pin_out.move(0, 0, 180) + nd.Pin(f'{pin["name"]}_in', width=width).put(pin_in.x, pin_in.y, pin_in.a) + return cell + + anchor_pins = _anchor_element_pin_entries(element_name, element) + for index in range(port_number): + local_y = -15.0 + _element_port_offset(index, port_number, pitch) + left_pin = anchor_pins[index * 2] + right_pin = anchor_pins[index * 2 + 1] + nd.Pin(left_pin["name"], width=width).put(0.0, local_y, 180.0) + nd.Pin(right_pin["name"], width=width).put(0.0, local_y, 0.0) + return cell + + +def _element_port_offset(index: int, count: int, pitch: float) -> float: + return ((count - 1) / 2 - index) * pitch + + +def _port_element_pin_entries(element_name: str, element) -> list: + count = max(1, int(getattr(element, "port_number", 1) or 1)) + defaults = [{"role": f"io{index + 1}", "name": f"{_safe_cell_name(element_name)}_io{index + 1}"} for index in range(count)] + return _named_pin_entries(defaults, getattr(element, "pins", None)) + + +def _anchor_element_pin_entries(element_name: str, element) -> list: + count = max(1, int(getattr(element, "port_number", 1) or 1)) + defaults = [] + for index in range(count): + number = index + 1 + defaults.append({"role": f"a{number}", "name": f"{_safe_cell_name(element_name)}_a{number}"}) + defaults.append({"role": f"b{number}", "name": f"{_safe_cell_name(element_name)}_b{number}"}) + return _named_pin_entries(defaults, getattr(element, "pins", None)) + + +def _named_pin_entries(defaults: list, overrides) -> list: + by_role = {str(pin.get("role") or ""): str(pin.get("name") or "") for pin in overrides or []} + by_index = [str(pin.get("name") or "") for pin in overrides or []] + entries = [] + for index, default in enumerate(defaults): + role = default["role"] + name = by_role.get(role) or (by_index[index] if index < len(by_index) else "") or default["name"] + entries.append({"role": role, "name": _safe_cell_name(name)}) + return entries + + +def _transform_element_port(local_x: float, local_y: float, angle: float, element) -> tuple: + rotation = math.radians(_safe_float(getattr(element, "angle", 0.0), 0.0) or 0.0) + cos_v = math.cos(rotation) + sin_v = math.sin(rotation) + x = element.x + local_x * cos_v - local_y * sin_v + y = element.y + local_x * sin_v + local_y * cos_v + return x, y, angle + + +def _safe_cell_name(value: str) -> str: + return "".join(ch if ch.isalnum() or ch == "_" else "_" for ch in str(value or "inst")) + + def _register_metadata_pins(pin_map, instance_name, instance, metadata: dict, nd, pdk_root: str) -> None: + layout_rotation = _layout_rotation(instance.rotation) for pin_name, pin in _metadata_pins(metadata, pdk_root).items(): x, y, angle = _transform_port( _safe_float(pin.get("x"), 0.0), @@ -234,238 +236,238 @@ def _register_metadata_pins(pin_map, instance_name, instance, metadata: dict, nd _safe_float(pin.get("a", pin.get("angle")), 0.0), instance.x, instance.y, - instance.rotation, + layout_rotation, instance.flip, instance.flop, ) - width = _safe_float(pin.get("width"), 0.5) or 0.5 - pin_map[(instance_name, str(pin_name))] = nd.Pin( - f"{instance_name}_{pin_name}", - width=width, - xs="strip", - ).put(x, y, angle) - - -def _metadata_pins(metadata: dict, pdk_root: str) -> dict: - if not metadata: - return {} - if isinstance(metadata.get("pins"), dict): - return metadata.get("pins") or {} - if isinstance(metadata.get("ports"), dict) and _allows_pdk_ports_as_pins(pdk_root): - return metadata.get("ports") or {} - return {} - - -def _allows_pdk_ports_as_pins(pdk_root: str) -> bool: - normalized = os.path.abspath(str(pdk_root or "")).replace("\\", "/").lower() - return "/opt_pdk_public/" in f"{normalized}/" or "/opt_pdk_atlas/" in f"{normalized}/" - - -def _metal_route_pcb_enabled(xsection: str) -> bool: - normalized = str(xsection or "").strip().lower().replace("-", "_") - return normalized in {"metal_1", "metal1", "metal_2", "metal2"} - - -def _route_link(link: LinkSpec, pin_map: dict, Route, warnings: list) -> None: - route = Route(radius=link.radius or 10, width=link.width, xs=link.xsection, PCB=_metal_route_pcb_enabled(link.xsection)) - p1 = pin_map.get((link.src_inst, link.src_pin)) - p2 = pin_map.get((link.dst_inst, link.dst_pin)) - has_pin_endpoints = bool(link.src_inst or link.src_pin or link.dst_inst or link.dst_pin) - if len(link.points or []) >= 2: - if has_pin_endpoints: - if p1 is None or p2 is None: - warnings.append(f"Missing route pin for {link.src_inst}:{link.src_pin} -> {link.dst_inst}:{link.dst_pin}") - return - points = _route_points_with_pin_endpoints(link.points, p1, p2) - else: - points = link.points - if _route_guided_link(link, route, warnings, points): - return - if p1 is None or p2 is None: - warnings.append(f"Missing route pin for {link.src_inst}:{link.src_pin} -> {link.dst_inst}:{link.dst_pin}") - return - method_name = _route_method_name_for_pins(p1, p2) - route_method = getattr(route, method_name, None) - if route_method is None: - warnings.append(f"Route method {method_name} unavailable; falling back to sbend_p2p") - route_method = route.sbend_p2p - route_method( - pin1=p1, - pin2=p2, - width=link.width, - radius=link.radius or 10, - xs=link.xsection, - arrow=False, - ).put() - -def _route_guided_link(link: LinkSpec, route, warnings: list, points_override=None) -> bool: - route_method = getattr(route, "strt_p2p", None) - if route_method is None: - warnings.append("Manual route points require Route.strt_p2p; falling back to automatic p2p route") - return False - bend_method = getattr(route, "bend_p2p", None) - source_points = points_override if points_override is not None else (link.points or []) - points = [ - {"x": _safe_float(point.get("x"), None), "y": _safe_float(point.get("y"), None)} - for point in source_points - ] - points = [point for point in points if point["x"] is not None and point["y"] is not None] - if len(points) < 2: - return False - if bend_method is None: - warnings.append("Manual route bends require Route.bend_p2p; corners were exported as straight joins") - plan = _guided_route_plan(points, (link.radius or 10) if bend_method is not None else 0) - for straight in plan["straights"]: - route_method( - pin1=(straight["start"]["x"], straight["start"]["y"], straight["angle"]), - pin2=(straight["end"]["x"], straight["end"]["y"], straight["angle"]), - width=link.width, - xs=link.xsection, - arrow=False, - ).put() - if bend_method is None: - return True - for bend in plan["bends"]: - bend_method( - pin1=(bend["start"]["x"], bend["start"]["y"], bend["angle_in"]), - pin2=(bend["end"]["x"], bend["end"]["y"], bend["angle_out"]), - radius=link.radius or 10, - width=link.width, - xs=link.xsection, - arrow=False, - ).put() - return True - - -def _route_points_with_pin_endpoints(points: list, source_pin=None, target_pin=None) -> list: - clean_points = [ - {"x": _safe_float(point.get("x"), None), "y": _safe_float(point.get("y"), None)} - for point in points or [] - if isinstance(point, dict) - ] - clean_points = [point for point in clean_points if point["x"] is not None and point["y"] is not None] - if len(clean_points) < 2: - return clean_points - source_point = _pin_point(source_pin) - target_point = _pin_point(target_pin) - if source_point is not None: - clean_points[0] = source_point - if target_point is not None: - clean_points[-1] = target_point - return clean_points - - -def _pin_point(pin): - for source in (pin, getattr(pin, "pointer", None)): - if source is None: - continue - x = _safe_float(getattr(source, "x", None), None) - y = _safe_float(getattr(source, "y", None), None) - if x is not None and y is not None: - return {"x": float(x), "y": float(y)} - xya = getattr(source, "xya", None) - if callable(xya): - try: - values = xya() - except TypeError: - values = None - if values and len(values) >= 2: - x = _safe_float(values[0], None) - y = _safe_float(values[1], None) - if x is not None and y is not None: - return {"x": float(x), "y": float(y)} - return None - - -def _guided_route_plan(points: list, radius: float) -> dict: - clean_points = [ - {"x": _safe_float(point.get("x"), None), "y": _safe_float(point.get("y"), None)} - for point in points or [] - if isinstance(point, dict) - ] - clean_points = [point for point in clean_points if point["x"] is not None and point["y"] is not None] - if len(clean_points) < 2: - return {"straights": [], "bends": []} - - trim_radius = max(_safe_float(radius, 0.0) or 0.0, 0.0) - corner_trims = {} - bends = [] - for index in range(1, len(clean_points) - 1): - previous_point = clean_points[index - 1] - corner = clean_points[index] - next_point = clean_points[index + 1] - angle_in = _segment_angle(previous_point, corner) - angle_out = _segment_angle(corner, next_point) - if angle_in is None or angle_out is None or angle_in == angle_out: - continue - turn_delta = abs(((angle_out - angle_in + 180) % 360) - 180) - if turn_delta >= 179: - continue - previous_length = _distance(previous_point, corner) - next_length = _distance(corner, next_point) - trim = min(trim_radius, previous_length / 2.0, next_length / 2.0) - if trim <= 1e-9: - continue - before = _point_toward(corner, previous_point, trim) - after = _point_toward(corner, next_point, trim) - corner_trims[index] = {"before": before, "after": after} - bends.append({ - "start": before, - "corner": {"x": float(corner["x"]), "y": float(corner["y"])}, - "end": after, - "angle_in": angle_in, - "angle_out": angle_out, - }) - - straights = [] - for index in range(len(clean_points) - 1): - start = corner_trims.get(index, {}).get("after", clean_points[index]) - end = corner_trims.get(index + 1, {}).get("before", clean_points[index + 1]) - angle = _segment_angle(start, end) - if angle is None or _distance(start, end) <= 1e-9: - continue - straights.append({ - "start": {"x": float(start["x"]), "y": float(start["y"])}, - "end": {"x": float(end["x"]), "y": float(end["y"])}, - "angle": angle, - }) - - return {"straights": straights, "bends": bends} - - -def _distance(point1: dict, point2: dict) -> float: - return math.hypot(point2["x"] - point1["x"], point2["y"] - point1["y"]) - - -def _point_toward(origin: dict, target: dict, distance: float) -> dict: - total = _distance(origin, target) - if total <= 1e-9: - return {"x": float(origin["x"]), "y": float(origin["y"])} - ratio = distance / total - return { - "x": float(origin["x"] + (target["x"] - origin["x"]) * ratio), - "y": float(origin["y"] + (target["y"] - origin["y"]) * ratio), - } - - -def _segment_angle(point1: dict, point2: dict): - dx = point2["x"] - point1["x"] - dy = point2["y"] - point1["y"] - if abs(dx) < 1e-9 and abs(dy) < 1e-9: - return None - if abs(dx) >= abs(dy): - return 0 if dx >= 0 else 180 - return 90 if dy >= 0 else 270 - - -def _pins_have_same_rotation(pin1, pin2, tolerance: float = 1e-6) -> bool: - angle1 = _pin_angle(pin1) - angle2 = _pin_angle(pin2) - if angle1 is None or angle2 is None: - return False - return abs(((angle1 - angle2 + 180) % 360) - 180) <= tolerance - - + width = _safe_float(pin.get("width"), 0.5) or 0.5 + pin_map[(instance_name, str(pin_name))] = nd.Pin( + f"{instance_name}_{pin_name}", + width=width, + xs="strip", + ).put(x, y, angle) + + +def _metadata_pins(metadata: dict, pdk_root: str) -> dict: + if not metadata: + return {} + if isinstance(metadata.get("pins"), dict): + return metadata.get("pins") or {} + if isinstance(metadata.get("ports"), dict) and _allows_pdk_ports_as_pins(pdk_root): + return metadata.get("ports") or {} + return {} + + +def _allows_pdk_ports_as_pins(pdk_root: str) -> bool: + normalized = os.path.abspath(str(pdk_root or "")).replace("\\", "/").lower() + return "/opt_pdk_public/" in f"{normalized}/" or "/opt_pdk_atlas/" in f"{normalized}/" + + +def _metal_route_pcb_enabled(xsection: str) -> bool: + normalized = str(xsection or "").strip().lower().replace("-", "_") + return normalized in {"metal_1", "metal1", "metal_2", "metal2"} + + +def _route_link(link: LinkSpec, pin_map: dict, Route, warnings: list) -> None: + route = Route(radius=link.radius or 10, width=link.width, xs=link.xsection, PCB=_metal_route_pcb_enabled(link.xsection)) + p1 = pin_map.get((link.src_inst, link.src_pin)) + p2 = pin_map.get((link.dst_inst, link.dst_pin)) + has_pin_endpoints = bool(link.src_inst or link.src_pin or link.dst_inst or link.dst_pin) + if len(link.points or []) >= 2: + if has_pin_endpoints: + if p1 is None or p2 is None: + warnings.append(f"Missing route pin for {link.src_inst}:{link.src_pin} -> {link.dst_inst}:{link.dst_pin}") + return + points = _route_points_with_pin_endpoints(link.points, p1, p2) + else: + points = link.points + if _route_guided_link(link, route, warnings, points): + return + if p1 is None or p2 is None: + warnings.append(f"Missing route pin for {link.src_inst}:{link.src_pin} -> {link.dst_inst}:{link.dst_pin}") + return + method_name = _route_method_name_for_pins(p1, p2) + route_method = getattr(route, method_name, None) + if route_method is None: + warnings.append(f"Route method {method_name} unavailable; falling back to sbend_p2p") + route_method = route.sbend_p2p + route_method( + pin1=p1, + pin2=p2, + width=link.width, + radius=link.radius or 10, + xs=link.xsection, + arrow=False, + ).put() + +def _route_guided_link(link: LinkSpec, route, warnings: list, points_override=None) -> bool: + route_method = getattr(route, "strt_p2p", None) + if route_method is None: + warnings.append("Manual route points require Route.strt_p2p; falling back to automatic p2p route") + return False + bend_method = getattr(route, "bend_p2p", None) + source_points = points_override if points_override is not None else (link.points or []) + points = [ + {"x": _safe_float(point.get("x"), None), "y": _safe_float(point.get("y"), None)} + for point in source_points + ] + points = [point for point in points if point["x"] is not None and point["y"] is not None] + if len(points) < 2: + return False + if bend_method is None: + warnings.append("Manual route bends require Route.bend_p2p; corners were exported as straight joins") + plan = _guided_route_plan(points, (link.radius or 10) if bend_method is not None else 0) + for straight in plan["straights"]: + route_method( + pin1=(straight["start"]["x"], straight["start"]["y"], straight["angle"]), + pin2=(straight["end"]["x"], straight["end"]["y"], straight["angle"]), + width=link.width, + xs=link.xsection, + arrow=False, + ).put() + if bend_method is None: + return True + for bend in plan["bends"]: + bend_method( + pin1=(bend["start"]["x"], bend["start"]["y"], bend["angle_in"]), + pin2=(bend["end"]["x"], bend["end"]["y"], bend["angle_out"]), + radius=link.radius or 10, + width=link.width, + xs=link.xsection, + arrow=False, + ).put() + return True + + +def _route_points_with_pin_endpoints(points: list, source_pin=None, target_pin=None) -> list: + clean_points = [ + {"x": _safe_float(point.get("x"), None), "y": _safe_float(point.get("y"), None)} + for point in points or [] + if isinstance(point, dict) + ] + clean_points = [point for point in clean_points if point["x"] is not None and point["y"] is not None] + if len(clean_points) < 2: + return clean_points + source_point = _pin_point(source_pin) + target_point = _pin_point(target_pin) + if source_point is not None: + clean_points[0] = source_point + if target_point is not None: + clean_points[-1] = target_point + return clean_points + + +def _pin_point(pin): + for source in (pin, getattr(pin, "pointer", None)): + if source is None: + continue + x = _safe_float(getattr(source, "x", None), None) + y = _safe_float(getattr(source, "y", None), None) + if x is not None and y is not None: + return {"x": float(x), "y": float(y)} + xya = getattr(source, "xya", None) + if callable(xya): + try: + values = xya() + except TypeError: + values = None + if values and len(values) >= 2: + x = _safe_float(values[0], None) + y = _safe_float(values[1], None) + if x is not None and y is not None: + return {"x": float(x), "y": float(y)} + return None + + +def _guided_route_plan(points: list, radius: float) -> dict: + clean_points = [ + {"x": _safe_float(point.get("x"), None), "y": _safe_float(point.get("y"), None)} + for point in points or [] + if isinstance(point, dict) + ] + clean_points = [point for point in clean_points if point["x"] is not None and point["y"] is not None] + if len(clean_points) < 2: + return {"straights": [], "bends": []} + + trim_radius = max(_safe_float(radius, 0.0) or 0.0, 0.0) + corner_trims = {} + bends = [] + for index in range(1, len(clean_points) - 1): + previous_point = clean_points[index - 1] + corner = clean_points[index] + next_point = clean_points[index + 1] + angle_in = _segment_angle(previous_point, corner) + angle_out = _segment_angle(corner, next_point) + if angle_in is None or angle_out is None or angle_in == angle_out: + continue + turn_delta = abs(((angle_out - angle_in + 180) % 360) - 180) + if turn_delta >= 179: + continue + previous_length = _distance(previous_point, corner) + next_length = _distance(corner, next_point) + trim = min(trim_radius, previous_length / 2.0, next_length / 2.0) + if trim <= 1e-9: + continue + before = _point_toward(corner, previous_point, trim) + after = _point_toward(corner, next_point, trim) + corner_trims[index] = {"before": before, "after": after} + bends.append({ + "start": before, + "corner": {"x": float(corner["x"]), "y": float(corner["y"])}, + "end": after, + "angle_in": angle_in, + "angle_out": angle_out, + }) + + straights = [] + for index in range(len(clean_points) - 1): + start = corner_trims.get(index, {}).get("after", clean_points[index]) + end = corner_trims.get(index + 1, {}).get("before", clean_points[index + 1]) + angle = _segment_angle(start, end) + if angle is None or _distance(start, end) <= 1e-9: + continue + straights.append({ + "start": {"x": float(start["x"]), "y": float(start["y"])}, + "end": {"x": float(end["x"]), "y": float(end["y"])}, + "angle": angle, + }) + + return {"straights": straights, "bends": bends} + + +def _distance(point1: dict, point2: dict) -> float: + return math.hypot(point2["x"] - point1["x"], point2["y"] - point1["y"]) + + +def _point_toward(origin: dict, target: dict, distance: float) -> dict: + total = _distance(origin, target) + if total <= 1e-9: + return {"x": float(origin["x"]), "y": float(origin["y"])} + ratio = distance / total + return { + "x": float(origin["x"] + (target["x"] - origin["x"]) * ratio), + "y": float(origin["y"] + (target["y"] - origin["y"]) * ratio), + } + + +def _segment_angle(point1: dict, point2: dict): + dx = point2["x"] - point1["x"] + dy = point2["y"] - point1["y"] + if abs(dx) < 1e-9 and abs(dy) < 1e-9: + return None + if abs(dx) >= abs(dy): + return 0 if dx >= 0 else 180 + return 90 if dy >= 0 else 270 + + +def _pins_have_same_rotation(pin1, pin2, tolerance: float = 1e-6) -> bool: + angle1 = _pin_angle(pin1) + angle2 = _pin_angle(pin2) + if angle1 is None or angle2 is None: + return False + return abs(((angle1 - angle2 + 180) % 360) - 180) <= tolerance + + def _route_method_name_for_pins(pin1, pin2) -> str: angle1 = _pin_angle(pin1) angle2 = _pin_angle(pin2) @@ -474,144 +476,160 @@ def _route_method_name_for_pins(pin1, pin2) -> str: delta = (angle2 - angle1) % 360 if delta <= 1e-6 or abs(delta - 360) <= 1e-6: return "ubend_p2p" + if abs(delta - 90) <= 1e-6 or abs(delta - 270) <= 1e-6: + return "strt_bend_strt_p2p" if 120 < delta < 240: return "sbend_p2p" return "bend_p2p" - - -def _pin_angle(pin): - for source in (pin, getattr(pin, "pointer", None)): - if source is None: - continue - for attr in ("a", "angle", "rot", "rotation"): - value = getattr(source, attr, None) - if callable(value): - try: - value = value() - except TypeError: - continue - number = _safe_float(value, None) - if number is not None: - return number % 360 - xya = getattr(source, "xya", None) - if callable(xya): - try: - values = xya() - except TypeError: - values = None - if values and len(values) >= 3: - number = _safe_float(values[2], None) - if number is not None: - return number % 360 - return None - - -def _resolve_pdk_asset(pdk_root: str, component: str, prefer_full_gds: bool) -> dict: - key = (component or "").replace("\\", "/").strip("/") - name = key.split("/")[-1] - direct = os.path.join(pdk_root, *key.split("/")) - search_dirs = [direct] if os.path.isdir(direct) else [] - if not search_dirs: - for root, dirs, _ in os.walk(pdk_root): - if os.path.basename(root) == name: - search_dirs.append(root) - dirs.clear() - break - if not search_dirs: - return {} - - search_dir = search_dirs[0] - yaml_path = _first_existing(search_dir, [f"{name}.yml", f"{name}.yaml"]) - gds_path = _find_gds(search_dir, name, prefer_full_gds) - metadata = {} - if yaml_path: - with open(yaml_path, "r", encoding="utf-8") as file: - metadata = yaml.safe_load(file) or {} - return {"yaml_path": yaml_path, "gds_path": gds_path, "metadata": metadata} - - -def _find_gds(search_dir: str, name: str, prefer_full_gds: bool): - preferred = [f"{name}.gds", f"{name}_BB.gds"] if prefer_full_gds else [f"{name}_BB.gds", f"{name}.gds"] - found = _first_existing(search_dir, preferred) - if found: - return found - files = sorted(filename for filename in os.listdir(search_dir) if filename.lower().endswith(".gds")) - full = [filename for filename in files if not filename.lower().endswith("_bb.gds")] - bb = [filename for filename in files if filename.lower().endswith("_bb.gds")] - ordered = full + bb if prefer_full_gds else bb + full - return os.path.join(search_dir, ordered[0]) if ordered else None - - -def _first_existing(search_dir: str, filenames: list): - for filename in filenames: - path = os.path.join(search_dir, filename) - if os.path.exists(path): - return path - return None - - + + +def _pin_angle(pin): + for source in (pin, getattr(pin, "pointer", None)): + if source is None: + continue + for attr in ("a", "angle", "rot", "rotation"): + value = getattr(source, attr, None) + if callable(value): + try: + value = value() + except TypeError: + continue + number = _safe_float(value, None) + if number is not None: + return number % 360 + xya = getattr(source, "xya", None) + if callable(xya): + try: + values = xya() + except TypeError: + values = None + if values and len(values) >= 3: + number = _safe_float(values[2], None) + if number is not None: + return number % 360 + return None + + +def _resolve_pdk_asset(pdk_root: str, component: str, prefer_full_gds: bool) -> dict: + key = (component or "").replace("\\", "/").strip("/") + name = key.split("/")[-1] + direct = os.path.join(pdk_root, *key.split("/")) + search_dirs = [direct] if os.path.isdir(direct) else [] + if not search_dirs: + for root, dirs, _ in os.walk(pdk_root): + if os.path.basename(root) == name: + search_dirs.append(root) + dirs.clear() + break + if not search_dirs: + return {} + + search_dir = search_dirs[0] + yaml_path = _first_existing(search_dir, [f"{name}.yml", f"{name}.yaml"]) + gds_path = _find_gds(search_dir, name, prefer_full_gds) + metadata = {} + if yaml_path: + with open(yaml_path, "r", encoding="utf-8") as file: + metadata = yaml.safe_load(file) or {} + return {"yaml_path": yaml_path, "gds_path": gds_path, "metadata": metadata} + + +def _find_gds(search_dir: str, name: str, prefer_full_gds: bool): + preferred = [f"{name}.gds", f"{name}_BB.gds"] if prefer_full_gds else [f"{name}_BB.gds", f"{name}.gds"] + found = _first_existing(search_dir, preferred) + if found: + return found + files = sorted(filename for filename in os.listdir(search_dir) if filename.lower().endswith(".gds")) + full = [filename for filename in files if not filename.lower().endswith("_bb.gds")] + bb = [filename for filename in files if filename.lower().endswith("_bb.gds")] + ordered = full + bb if prefer_full_gds else bb + full + return os.path.join(search_dir, ordered[0]) if ordered else None + + +def _first_existing(search_dir: str, filenames: list): + for filename in filenames: + path = os.path.join(search_dir, filename) + if os.path.exists(path): + return path + return None + + def _transform_port(px: float, py: float, pa: float, ix: float, iy: float, rotation: float, flip: bool = False, flop: bool = False): - if flip: - py = -py - pa = -pa - if flop: - px = -px - pa = 180 - pa - theta = math.radians(rotation) - c, s = math.cos(theta), math.sin(theta) + if flip: + py = -py + pa = -pa + if flop: + px = -px + pa = 180 - pa + theta = math.radians(rotation) + c, s = math.cos(theta), math.sin(theta) return ix + px * c - py * s, iy + px * s + py * c, (pa + rotation) % 360 +def _layout_rotation(eda_rotation: float) -> float: + return -(_safe_float(eda_rotation, 0.0) or 0.0) + + def _safe_float(value, default=0.0): - if value is None: - return default - if isinstance(value, str) and value.strip().lower() in {"", "none", "null"}: - return default - try: - return float(value) - except (TypeError, ValueError): - return default - - -class _NazcaInterconnectRoute: - """Small mxpic_forge.Route-compatible adapter around Nazca Interconnect.""" - - backend_name = "nazca Interconnect" - - def __init__(self, radius=None, width=None, xs=None, PCB=False): - self.radius = radius - self.width = width - self.xs = xs - self.PCB = PCB - self._interconnect = self._create_interconnect(radius=radius, width=width, xs=xs, PCB=PCB) - - @staticmethod - def _create_interconnect(radius=None, width=None, xs=None, PCB=False): - import nazca as nd - - return nd.interconnects.Interconnect(radius=radius, width=width, xs=xs, PCB=PCB) - - def strt_p2p(self, pin1=None, pin2=None, width=None, xs=None, arrow=True, **kwargs): - return self._interconnect.strt_p2p( - pin1=pin1, - pin2=pin2, - width=self._route_width(width), - xs=self._route_xs(xs), - arrow=arrow, - ) - - def sbend_p2p(self, pin1=None, pin2=None, width=None, radius=None, xs=None, arrow=True, **kwargs): - return self._interconnect.sbend_p2p( - pin1=pin1, - pin2=pin2, - width=self._route_width(width), - radius=self._route_radius(radius), - xs=self._route_xs(xs), - arrow=arrow, - ) - + if value is None: + return default + if isinstance(value, str) and value.strip().lower() in {"", "none", "null"}: + return default + try: + return float(value) + except (TypeError, ValueError): + return default + + +class _NazcaInterconnectRoute: + """Small mxpic_forge.Route-compatible adapter around Nazca Interconnect.""" + + backend_name = "nazca Interconnect" + + def __init__(self, radius=None, width=None, xs=None, PCB=False): + self.radius = radius + self.width = width + self.xs = xs + self.PCB = PCB + self._interconnect = self._create_interconnect(radius=radius, width=width, xs=xs, PCB=PCB) + + @staticmethod + def _create_interconnect(radius=None, width=None, xs=None, PCB=False): + import nazca as nd + + return nd.interconnects.Interconnect(radius=radius, width=width, xs=xs, PCB=PCB) + + def strt_p2p(self, pin1=None, pin2=None, width=None, xs=None, arrow=True, **kwargs): + return self._interconnect.strt_p2p( + pin1=pin1, + pin2=pin2, + width=self._route_width(width), + xs=self._route_xs(xs), + arrow=arrow, + ) + + def sbend_p2p(self, pin1=None, pin2=None, width=None, radius=None, xs=None, arrow=True, **kwargs): + return self._interconnect.sbend_p2p( + pin1=pin1, + pin2=pin2, + width=self._route_width(width), + radius=self._route_radius(radius), + xs=self._route_xs(xs), + arrow=arrow, + ) + def ubend_p2p(self, pin1=None, pin2=None, width=None, radius=None, xs=None, arrow=True, **kwargs): return self._interconnect.ubend_p2p( + pin1=pin1, + pin2=pin2, + width=self._route_width(width), + radius=self._route_radius(radius), + xs=self._route_xs(xs), + arrow=arrow, + ) + + def strt_bend_strt_p2p(self, pin1=None, pin2=None, width=None, radius=None, xs=None, arrow=True, **kwargs): + return self._interconnect.strt_bend_strt_p2p( pin1=pin1, pin2=pin2, width=self._route_width(width), @@ -625,46 +643,46 @@ class _NazcaInterconnectRoute: if route_method is None: route_method = getattr(self._interconnect, "strt_bend_strt_p2p") return route_method( - pin1=pin1, - pin2=pin2, - width=self._route_width(width), - radius=self._route_radius(radius), - xs=self._route_xs(xs), - arrow=arrow, - ) - - def _route_width(self, width): - return self.width if width is None else width - - def _route_radius(self, radius): - return self.radius if radius is None else radius - - def _route_xs(self, xs): - return self.xs if xs is None else xs - - -def _import_route_backend(nd=None): - try: - route = _import_mxpic_forge_route() - setattr(route, "backend_name", getattr(route, "backend_name", "mxpic_forge Route")) - return route - except Exception: - if nd is None: - import nazca as nd - if not hasattr(nd, "interconnects") or not hasattr(nd.interconnects, "Interconnect"): - raise - return _NazcaInterconnectRoute - - -def _import_mxpic_forge_route(): - forge_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "mxpic_forge")) - if os.path.isdir(forge_root) and forge_root not in sys.path: - sys.path.insert(0, forge_root) - loaded_mxpic = sys.modules.get("mxpic") - loaded_path = os.path.abspath(getattr(loaded_mxpic, "__file__", "")) if loaded_mxpic else "" - if loaded_mxpic and forge_root not in loaded_path: - for module_name in list(sys.modules): - if module_name == "mxpic" or module_name.startswith("mxpic."): - del sys.modules[module_name] - from mxpic import Route - return Route + pin1=pin1, + pin2=pin2, + width=self._route_width(width), + radius=self._route_radius(radius), + xs=self._route_xs(xs), + arrow=arrow, + ) + + def _route_width(self, width): + return self.width if width is None else width + + def _route_radius(self, radius): + return self.radius if radius is None else radius + + def _route_xs(self, xs): + return self.xs if xs is None else xs + + +def _import_route_backend(nd=None): + try: + route = _import_mxpic_forge_route() + setattr(route, "backend_name", getattr(route, "backend_name", "mxpic_forge Route")) + return route + except Exception: + if nd is None: + import nazca as nd + if not hasattr(nd, "interconnects") or not hasattr(nd.interconnects, "Interconnect"): + raise + return _NazcaInterconnectRoute + + +def _import_mxpic_forge_route(): + forge_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "mxpic_forge")) + if os.path.isdir(forge_root) and forge_root not in sys.path: + sys.path.insert(0, forge_root) + loaded_mxpic = sys.modules.get("mxpic") + loaded_path = os.path.abspath(getattr(loaded_mxpic, "__file__", "")) if loaded_mxpic else "" + if loaded_mxpic and forge_root not in loaded_path: + for module_name in list(sys.modules): + if module_name == "mxpic" or module_name.startswith("mxpic."): + del sys.modules[module_name] + from mxpic import Route + return Route