This commit is contained in:
xsxx03-art
2026-06-03 10:06:48 +08:00
parent cf28676756
commit 9b4f43f0b1
202 changed files with 14111 additions and 10107 deletions
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+157 -9
View File
@@ -1,16 +1,33 @@
# -----------------------------------------------------------------------------
# Description: SQLite database initialization and persistence helpers for users, projects, cells, and project metadata.
# Inside functions: connect_db, init_db, get_user, get_user_profile, get_user_auth_by_id, update_user_occupation, update_user_password, add_user_log, list_user_logs
# Developer : Qin Yue @ 2026
# Organization : OptiHK Limited
# -----------------------------------------------------------------------------
# backend/database.py
import sqlite3
import os
from werkzeug.security import generate_password_hash
from datetime import datetime
# Save the database in the backend folder
DB_FILE = os.path.join(os.path.dirname(__file__), "..\\database\\mxpic_data.db")
# Store application data in the shared database folder so all backend modules
# use the same SQLite file regardless of their import path.
DB_FILE = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "database", "mxpic_data.db"))
def connect_db():
"""Open a SQLite connection with row-style access for application data queries."""
conn = sqlite3.connect(DB_FILE, timeout=20)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=20000")
return conn
def init_db():
conn = sqlite3.connect(DB_FILE)
"""Create the user, profile, and audit-log tables required by the backend."""
os.makedirs(os.path.dirname(DB_FILE), exist_ok=True)
conn = connect_db()
cursor = conn.cursor()
# Create Users Table
# Core account table used by login, profile, and role-based PDK access.
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -18,25 +35,156 @@ def init_db():
password_hash TEXT NOT NULL
)
''')
# Audit log table used by dashboard activity history and backend action
# tracing.
cursor.execute('''
CREATE TABLE IF NOT EXISTS user_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
username TEXT NOT NULL,
action TEXT NOT NULL,
project TEXT,
cell TEXT,
detail TEXT,
ip_address TEXT,
created_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES users(id)
)
''')
# Lightweight migrations keep older local SQLite files compatible after
# profile, credit, occupation, and role fields were added.
cursor.execute("PRAGMA table_info(users)")
existing_columns = {row[1] for row in cursor.fetchall()}
migrations = {
"created_at": "ALTER TABLE users ADD COLUMN created_at TEXT",
"credits": "ALTER TABLE users ADD COLUMN credits INTEGER NOT NULL DEFAULT 0",
"occupation": "ALTER TABLE users ADD COLUMN occupation TEXT NOT NULL DEFAULT 'intern'",
"user_group": "ALTER TABLE users ADD COLUMN user_group TEXT NOT NULL DEFAULT 'user'",
}
for column, statement in migrations.items():
if column not in existing_columns:
cursor.execute(statement)
now = datetime.utcnow().strftime("%Y-%m-%d")
cursor.execute("UPDATE users SET created_at = ? WHERE created_at IS NULL OR created_at = ''", (now,))
cursor.execute("UPDATE users SET user_group = 'manager' WHERE username = 'admin'")
cursor.execute("UPDATE users SET user_group = 'developers' WHERE username = 'engineer'")
cursor.execute("UPDATE users SET user_group = 'user' WHERE user_group IS NULL OR user_group = ''")
# Insert a test user if the table is empty
# Default local accounts let developers test manager/developer/user access
# without manually editing the database.
cursor.execute("SELECT * FROM users WHERE username = 'admin'")
if not cursor.fetchone():
test_hash = generate_password_hash("123456")
cursor.execute("INSERT INTO users (username, password_hash) VALUES (?, ?)", ("admin", test_hash))
cursor.execute(
"INSERT INTO users (username, password_hash, created_at, credits, occupation, user_group) VALUES (?, ?, ?, ?, ?, ?)",
("admin", test_hash, now, 0, "principle engineer", "manager")
)
print("Test user created. Username: admin | Password: 123456")
cursor.execute("SELECT * FROM users WHERE username = 'engineer'")
if not cursor.fetchone():
test_hash = generate_password_hash("123456")
cursor.execute(
"INSERT INTO users (username, password_hash, created_at, credits, occupation, user_group) VALUES (?, ?, ?, ?, ?, ?)",
("engineer", test_hash, now, 0, "junior engineer", "developers")
)
print("Second test user created. Username: engineer | Password: 123456")
conn.commit()
conn.close()
def get_user(username):
conn = sqlite3.connect(DB_FILE)
"""Fetch login credentials and account status for a username."""
conn = connect_db()
cursor = conn.cursor()
cursor.execute("SELECT id, username, password_hash FROM users WHERE username = ?", (username,))
cursor.execute("SELECT id, username, password_hash, user_group FROM users WHERE username = ?", (username,))
user = cursor.fetchone()
conn.close()
return user
def get_user_profile(user_id):
"""Fetch editable profile details for an authenticated user."""
conn = connect_db()
cursor = conn.cursor()
cursor.execute(
"SELECT id, username, created_at, credits, occupation, user_group FROM users WHERE id = ?",
(user_id,)
)
user = cursor.fetchone()
conn.close()
return user
def get_user_auth_by_id(user_id):
"""Fetch password and account metadata by user id."""
conn = connect_db()
cursor = conn.cursor()
cursor.execute("SELECT id, username, password_hash FROM users WHERE id = ?", (user_id,))
user = cursor.fetchone()
conn.close()
return user
def update_user_occupation(user_id, occupation):
"""Persist a user profile occupation update."""
conn = connect_db()
cursor = conn.cursor()
cursor.execute("UPDATE users SET occupation = ? WHERE id = ?", (occupation, user_id))
conn.commit()
conn.close()
def update_user_password(user_id, password):
"""Persist a newly hashed password for a user."""
conn = connect_db()
cursor = conn.cursor()
cursor.execute("UPDATE users SET password_hash = ? WHERE id = ?", (generate_password_hash(password), user_id))
conn.commit()
conn.close()
def add_user_log(user_id, username, action, project=None, cell=None, detail=None, ip_address=None):
"""Record an auditable user action with optional project and cell context."""
conn = connect_db()
cursor = conn.cursor()
cursor.execute(
'''
INSERT INTO user_logs (user_id, username, action, project, cell, detail, ip_address, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''',
(
user_id,
username,
action,
project,
cell,
detail,
ip_address,
datetime.utcnow().isoformat(timespec="seconds") + "Z"
)
)
conn.commit()
conn.close()
def list_user_logs(user_id, limit=200):
"""Return recent audit-log entries for display in the account page."""
conn = connect_db()
cursor = conn.cursor()
cursor.execute(
'''
SELECT id, action, project, cell, detail, ip_address, created_at
FROM user_logs
WHERE user_id = ?
ORDER BY id DESC
LIMIT ?
''',
(user_id, limit)
)
rows = cursor.fetchall()
conn.close()
return rows
if __name__ == "__main__":
# Allow this module to be run directly when a fresh local database needs to
# be initialized outside the Flask server.
init_db()
print("Database initialized successfully.")
print("Database initialized successfully.")
-79
View File
@@ -1,79 +0,0 @@
import os
import yaml
from collections import OrderedDict
from flask import Flask, jsonify, send_from_directory, request, redirect, url_for, session, render_template
from werkzeug.security import check_password_hash
import database
# --- Path Configurations ---
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
FRONTEND_DIR = os.path.join(BASE_DIR, '..', 'frontend')
YML_PATH = os.path.join(BASE_DIR, '..', 'mxpic', 'PDKs', 'Silterra', 'directories.yaml')
COMPS_ROOT = os.path.join(BASE_DIR, '..', 'mxpic', 'PDKs', 'Silterra')
# Define where your new icons folder is located (adjust if it's placed elsewhere)
ICONS_DIR = os.path.join(BASE_DIR, 'icons')
app = Flask(__name__, template_folder=FRONTEND_DIR, static_folder=FRONTEND_DIR)
app.secret_key = 'super_secret_mxpic_key'
app.json.sort_keys = False
database.init_db()
# ... [Keep countSpaces and buildTree exactly as they are] ...
def findComps(baseDir):
"""Scan component folders, return map of paths -> component info."""
compMap = {}
refDir = baseDir
for root, dirs, files in os.walk(baseDir):
ymlFiles = [f for f in files if f.endswith('.yml')]
if ymlFiles:
parentDir = os.path.dirname(root)
relPath = os.path.relpath(parentDir, refDir)
parts = () if relPath == '.' else tuple(relPath.split(os.sep))
compName = os.path.basename(root)
# Extract the category (the mother folder's name)
category = os.path.basename(parentDir)
compMap[parts] = {
'folder': compName,
'yml': ymlFiles[0],
'category': category # Save the category to the map
}
dirs.clear()
return compMap
def addCompsToTree(compMap):
"""Build a completely fresh tree from scratch and insert component nodes."""
fresh_tree = OrderedDict()
for pathSeg, compItem in compMap.items():
compName = compItem['folder']
curNode = fresh_tree
for seg in pathSeg:
if seg not in curNode:
curNode[seg] = OrderedDict()
curNode = curNode[seg]
curNode[compName] = OrderedDict({
"__type__": "component",
"__name__": compName,
"__yml__": compItem['yml'],
"__category__": compItem['category'] # Inject category into the tree
})
return fresh_tree
if os.path.isdir(COMPS_ROOT):
compMap = findComps(COMPS_ROOT)
fresh_tree = addCompsToTree(compMap)
print(compMap)
print(fresh_tree)
+23
View File
@@ -0,0 +1,23 @@
# -----------------------------------------------------------------------------
# Description: Directory configuration metadata used to locate PDK and project resources.
# Inside functions: N/A - declarative YAML metadata/configuration.
# Developer : Qin Yue @ 2026
# Organization : OptiHK Limited
# -----------------------------------------------------------------------------
level : 4
root :
- PDK_libs
- primitives
- directional_couplers
- edge_couplers
- crossings
- multimode_interferometers
- photodectors
- compotites
- MZIs
- electronics
- resistors
- capacitors
- others
- logos
+84
View File
@@ -0,0 +1,84 @@
# -----------------------------------------------------------------------------
# Description: Backend integration wrapper for required mxpic_router project GDS generation.
# Inside functions: build_project_gds, _build_with_mxpic_router, _load_project_cells
# Developer : Qin Yue @ 2026
# Organization : OptiHK Limited
# -----------------------------------------------------------------------------
import os
from dataclasses import dataclass, field
from typing import Dict, List
import yaml
from router_dependency import require_router_stack
@dataclass
class BuildResult:
"""Container for GDS build output paths, status details, and engine metadata."""
output_path: str
engine: str
cells_built: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)
def build_project_gds(
project_dir: str,
output_path: str,
pdk_public_root: str,
technology_manifest_path: str = None,
prefer_full_gds: bool = False,
) -> BuildResult:
"""Build a hierarchical project GDS from saved cell YAML files with mxpic_router."""
cells = _load_project_cells(project_dir)
if not cells:
raise ValueError("No saved cell YAML files found for this project")
return _build_with_mxpic_router(
project_dir,
output_path,
pdk_public_root,
technology_manifest_path,
prefer_full_gds,
)
def _build_with_mxpic_router(
project_dir: str,
output_path: str,
pdk_root: str,
technology_manifest_path: str,
prefer_full_gds: bool,
) -> BuildResult:
"""Delegate project GDS generation to the required external mxpic_router package."""
require_router_stack()
from mxpic_router import build_project_gds as build_routed_project_gds
result = build_routed_project_gds(
project_dir=project_dir,
output_path=output_path,
pdk_root=pdk_root,
technology_manifest_path=technology_manifest_path,
prefer_full_gds=prefer_full_gds,
)
return BuildResult(
output_path=result.get("output_path", output_path),
engine=result.get("engine", "mxpic_router"),
cells_built=result.get("cells_built", []),
warnings=result.get("warnings", []),
)
def _load_project_cells(project_dir: str) -> Dict[str, dict]:
"""Load saved cell YAML documents from a project directory."""
cells = {}
for filename in sorted(os.listdir(project_dir)):
if not filename.lower().endswith((".yml", ".yaml")):
continue
path = os.path.join(project_dir, filename)
with open(path, "r", encoding="utf-8") as file:
data = yaml.safe_load(file) or {}
cell_name = str(data.get("name") or os.path.splitext(filename)[0])
cells[cell_name] = data
return cells
Binary file not shown.
+78
View File
@@ -0,0 +1,78 @@
# -----------------------------------------------------------------------------
# Description: Role-aware PDK access control utilities for resolving which PDK roots a user or project may use.
# Inside functions: normalize_user_group, pdk_root_for_group, pdk_root_for_session, prefer_full_gds_for_session, create_export_path, cleanup_expired_exports
# Developer : Qin Yue @ 2026
# Organization : OptiHK Limited
# -----------------------------------------------------------------------------
import os
import time
import shutil
import uuid
# Role names control which PDK tree the backend exposes for browsing, preview,
# and GDS export requests.
MANAGER_GROUP = "manager"
DEVELOPER_GROUP = "developers"
USER_GROUP = "user"
ALLOWED_GROUPS = {MANAGER_GROUP, DEVELOPER_GROUP, USER_GROUP}
def normalize_user_group(user_group: str) -> str:
"""Normalize a session user group into the supported PDK access scope."""
group = (user_group or USER_GROUP).strip().lower()
return group if group in ALLOWED_GROUPS else USER_GROUP
def pdk_root_for_group(user_group: str, repo_root: str) -> str:
"""Resolve the PDK library root that belongs to a user group."""
group = normalize_user_group(user_group)
# Managers may access the private atlas PDK tree; all other roles stay on
# the public PDK tree used for normal project editing.
if group == MANAGER_GROUP:
return os.path.abspath(os.environ.get(
"MXPIC_PDK_ATLAS_ROOT",
os.path.join(repo_root, "opt_pdk_atlas", "foundries"),
))
return os.path.abspath(os.environ.get(
"MXPIC_PDK_PUBLIC_ROOT",
os.path.join(repo_root, "opt_pdk_public", "foundries"),
))
def pdk_root_for_session(session_obj, repo_root: str) -> str:
"""Resolve the active PDK root from the current Flask session."""
return pdk_root_for_group(session_obj.get("user_group"), repo_root)
def prefer_full_gds_for_session(session_obj) -> bool:
"""Decide whether resolved PDK assets should prefer full GDS files."""
return normalize_user_group(session_obj.get("user_group")) == MANAGER_GROUP
def create_export_path(export_root: str, project_name: str) -> tuple[str, str, str]:
"""Create a unique export directory and filename for generated downloads."""
export_id = uuid.uuid4().hex
filename = f"{project_name}.gds"
export_dir = os.path.abspath(os.path.join(export_root, export_id))
os.makedirs(export_dir, exist_ok=True)
return export_id, filename, os.path.join(export_dir, filename)
def cleanup_expired_exports(export_root: str, max_age_seconds: int = 86400) -> None:
"""Remove old export folders so temporary download storage stays bounded."""
if not os.path.isdir(export_root):
return
now = time.time()
# Each export is stored in its own UUID directory, so old folders can be
# removed independently without touching active project data.
for name in os.listdir(export_root):
path = os.path.join(export_root, name)
if not os.path.isdir(path):
continue
try:
age = now - os.path.getmtime(path)
if age > max_age_seconds:
shutil.rmtree(path, ignore_errors=True)
except OSError:
continue
+61
View File
@@ -0,0 +1,61 @@
# -----------------------------------------------------------------------------
# Description: Routed layout preview helpers that invoke the routing/build flow and return preview artifacts.
# Inside functions: create_routed_layout_svg, layout_has_links
# Developer : Qin Yue @ 2026
# Organization : OptiHK Limited
# -----------------------------------------------------------------------------
import os
import tempfile
import yaml
from router_dependency import require_router_stack
def create_routed_layout_svg(
yaml_content: str,
output_path: str,
pdk_root: str,
project_dir: str,
technology_manifest_path: str = None,
prefer_full_gds: bool = False,
) -> str:
"""Create an SVG preview from routed GDS geometry generated by mxpic_router."""
require_router_stack(require_gdstk=True)
import gdstk
layout = yaml.safe_load(yaml_content) or {}
cell_name = str(layout.get("name") or "layout")
from mxpic_router import build_project_gds
os.makedirs(os.path.dirname(output_path), exist_ok=True)
# Build into a temporary GDS first, then convert the generated top cell into
# the SVG preview consumed by the canvas.
with tempfile.TemporaryDirectory(prefix="mxpic_routed_preview_") as temp_dir:
temp_gds = os.path.join(temp_dir, f"{cell_name}.gds")
build_project_gds(
project_dir=project_dir,
output_path=temp_gds,
pdk_root=pdk_root,
technology_manifest_path=technology_manifest_path,
prefer_full_gds=prefer_full_gds,
target_cell_name=cell_name,
)
library = gdstk.read_gds(temp_gds)
top_cells = library.top_level()
if not top_cells:
raise RuntimeError("Routed preview GDS has no top-level cell")
top_cells[0].write_svg(output_path)
return output_path
def layout_has_links(yaml_content: str) -> bool:
"""Detect whether a layout YAML document contains routed bundle links."""
layout = yaml.safe_load(yaml_content) or {}
# Any bundle link means preview generation must use the routed builder
# instead of simple component placement.
for bundle in (layout.get("bundles") or {}).values():
links = bundle.get("links") or []
if links:
return True
return False
+77
View File
@@ -0,0 +1,77 @@
# -----------------------------------------------------------------------------
# Description: Build-time mxpic_router runtime dependency validation helpers.
# Inside functions: ensure_router_path, require_router_stack
# Developer : Qin Yue @ 2026
# Organization : OptiHK Limited
# -----------------------------------------------------------------------------
import importlib
import os
import sys
from dataclasses import dataclass, field
from typing import List
@dataclass
class RouterStackStatus:
"""Summary of the router stack checks completed for a build action."""
ok: bool
router_root: str
checked: List[str] = field(default_factory=list)
class RouterStackUnavailable(RuntimeError):
"""Raised when a build action needs the external router stack but it is absent."""
pass
def ensure_router_path() -> str:
"""Add the sibling mxpic_router checkout to import resolution when present."""
router_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "mxpic_router"))
if os.path.isdir(router_root) and router_root not in sys.path:
sys.path.insert(0, router_root)
return router_root
def require_router_stack(require_gdstk: bool = False) -> RouterStackStatus:
"""Validate the runtime stack required for build-time router actions."""
router_root = ensure_router_path()
checked = []
missing = []
try:
importlib.import_module("mxpic_router")
checked.append("mxpic_router")
except Exception as exc:
missing.append(f"mxpic_router: {exc}")
try:
importlib.import_module("nazca")
checked.append("nazca")
except Exception as exc:
missing.append(f"nazca: {exc}")
if require_gdstk:
try:
importlib.import_module("gdstk")
checked.append("gdstk")
except Exception as exc:
missing.append(f"gdstk: {exc}")
try:
router_builder = importlib.import_module("mxpic_router.builder")
route_factory = getattr(router_builder, "_import_mxpic_forge_route")
route_factory()
checked.append("mxpic_forge Route")
except Exception as exc:
missing.append(f"mxpic_forge Route: {exc}")
if missing:
details = "; ".join(missing)
raise RouterStackUnavailable(
"Required mxpic_router runtime stack is unavailable. "
"Build actions require the matched mxpic_router and mxpic_forge checkouts, "
f"Nazca, and gdstk for SVG preview generation. Details: {details}"
)
return RouterStackStatus(ok=True, router_root=router_root, checked=checked)
+808 -137
View File
File diff suppressed because it is too large Load Diff
-287
View File
@@ -1,287 +0,0 @@
# import os
# import yaml
# from collections import OrderedDict
# from flask import Flask, jsonify, send_from_directory, request, redirect, url_for, session, render_template
# from werkzeug.security import check_password_hash
# import database # Imports the database.py you created earlier
# # --- Path Configurations ---
# BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# FRONTEND_DIR = os.path.join(BASE_DIR, '..', 'frontend')
# # Use os.path.join exclusively for cross-platform safety
# YML_PATH = os.path.join(BASE_DIR, '..', 'mxpic', 'PDKs', 'Silterra', 'directories.yaml')
# COMPS_ROOT = os.path.join(BASE_DIR, '..', 'mxpic', 'PDKs', 'Silterra')
# # Initialize Flask, pointing to the frontend folder for HTML/CSS/JS
# app = Flask(__name__, template_folder=FRONTEND_DIR, static_folder=FRONTEND_DIR)
# app.secret_key = 'super_secret_mxpic_key' # Required for session management
# app.json.sort_keys = False # Keep dictionary order
# # Ensure database tables exist when the server boots
# database.init_db()
# # --- YAML & PDK Parsing Helper Functions (Unchanged) ---
# def countSpaces(line):
# """Count leading spaces (tab=4)."""
# expanded = line.expandtabs(4)
# return len(expanded) - len(expanded.lstrip(' '))
# def buildTree(filepath):
# """Build nested tree from indented yaml."""
# if not os.path.exists(filepath):
# return OrderedDict()
# with open(filepath, 'r', encoding='utf-8') as f:
# lines = f.readlines()
# rootIdx = None
# for i, line in enumerate(lines):
# if line.strip().startswith('root') and ':' in line.strip():
# rootIdx = i
# break
# if rootIdx is None:
# return OrderedDict()
# entries = []
# for line in lines[rootIdx + 1:]:
# stripped = line.strip()
# if not stripped or stripped.startswith('#'):
# continue
# if stripped.startswith('- '):
# spaceNum = countSpaces(line)
# # FIX 1: Strip trailing colons off the string so 'composites:' becomes 'composites'
# name = stripped[2:].strip().rstrip(':')
# if name:
# entries.append((spaceNum, name))
# if not entries:
# return OrderedDict()
# minIndent = min(indent for indent, _ in entries)
# nest = OrderedDict()
# levelStack = [(minIndent - 1, nest)]
# for spaceNum, name in entries:
# while levelStack and levelStack[-1][0] >= spaceNum:
# levelStack.pop()
# parent = levelStack[-1][1]
# child = OrderedDict()
# parent[name] = child
# levelStack.append((spaceNum, child))
# return nest
# def addCompsToTree(compMap):
# """
# Build a completely fresh tree from scratch and insert component nodes.
# No previous tree object or inspection required.
# """
# # Initialize a clean, empty root tree
# fresh_tree = OrderedDict()
# for pathSeg, compItem in compMap.items():
# compName = compItem['folder']
# curNode = fresh_tree
# # Sequentially build the nested path segments dynamically
# for seg in pathSeg:
# if seg not in curNode:
# curNode[seg] = OrderedDict()
# curNode = curNode[seg]
# # Place the component metadata dictionary into its leaf node
# curNode[compName] = OrderedDict({
# "__type__": "component",
# "__name__": compName,
# "__yml__": compItem['yml']
# })
# return fresh_tree
import os
import yaml
from collections import OrderedDict
from flask import Flask, jsonify, send_from_directory, request, redirect, url_for, session, render_template
from werkzeug.security import check_password_hash
import database
# --- Path Configurations ---
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
FRONTEND_DIR = os.path.join(BASE_DIR, '..', 'frontend')
YML_PATH = os.path.join(BASE_DIR, '..', 'mxpic', 'PDKs', 'Silterra', 'directories.yaml')
COMPS_ROOT = os.path.join(BASE_DIR, '..', 'mxpic', 'PDKs', 'Silterra')
# Define where your new icons folder is located (adjust if it's placed elsewhere)
ICONS_DIR = os.path.join(BASE_DIR, 'icons')
app = Flask(__name__, template_folder=FRONTEND_DIR, static_folder=FRONTEND_DIR)
app.secret_key = 'super_secret_mxpic_key'
app.json.sort_keys = False
database.init_db()
# ... [Keep countSpaces and buildTree exactly as they are] ...
def findComps(baseDir):
"""Scan component folders, return map of paths -> component info."""
compMap = {}
refDir = baseDir
for root, dirs, files in os.walk(baseDir):
ymlFiles = [f for f in files if f.endswith('.yml')]
if ymlFiles:
parentDir = os.path.dirname(root)
relPath = os.path.relpath(parentDir, refDir)
parts = () if relPath == '.' else tuple(relPath.split(os.sep))
compName = os.path.basename(root)
# Extract the category (the mother folder's name)
category = os.path.basename(parentDir)
compMap[parts] = {
'folder': compName,
'yml': ymlFiles[0],
'category': category # Save the category to the map
}
dirs.clear()
return compMap
def addCompsToTree(compMap):
"""Build a completely fresh tree from scratch and insert component nodes."""
fresh_tree = OrderedDict()
for pathSeg, compItem in compMap.items():
compName = compItem['folder']
curNode = fresh_tree
for seg in pathSeg:
if seg not in curNode:
curNode[seg] = OrderedDict()
curNode = curNode[seg]
curNode[compName] = OrderedDict({
"__type__": "component",
"__name__": compName,
"__yml__": compItem['yml'],
"__category__": compItem['category'] # Inject category into the tree
})
return fresh_tree
# ... [Keep readCompYaml and Page Routes exactly as they are] ...
# --- API ROUTES (Library, Components & Icons) ---
@app.route('/api/icon/<category>')
def getIcon(category):
"""Serve the icon corresponding to the component category."""
# Look for an image matching the category name (e.g., edge_coupler.png)
for ext in ('.png', '.svg', '.jpg'):
icon_path = os.path.join(ICONS_DIR, f"{category}{ext}")
if os.path.exists(icon_path):
return send_from_directory(ICONS_DIR, f"{category}{ext}")
# Optional: Return a default fallback icon if the specific one is missing
fallback = os.path.join(ICONS_DIR, "default.png")
if os.path.exists(fallback):
return send_from_directory(ICONS_DIR, "default.png")
return jsonify({"error": "Icon not found"}), 404
# ... [Keep existing API routes below] ...
def readCompYaml(compName):
"""Load YAML from component folder."""
for root, dirs, files in os.walk(COMPS_ROOT):
if os.path.basename(root) == compName:
dirs.clear()
ymlFiles = [f for f in files if f.endswith('.yml')]
if ymlFiles:
ymlPath = os.path.join(root, ymlFiles[0])
with open(ymlPath, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
return None
# --- AUTHENTICATION & PAGE ROUTES ---
@app.route('/')
def home():
"""Route to login page, or bypass to dashboard if already authenticated."""
if 'user_id' in session:
return redirect(url_for('dashboard'))
return render_template('login.html')
@app.route('/login', methods=['POST'])
def login():
"""Verify credentials against the database."""
username = request.form.get('username')
password = request.form.get('password')
user = database.get_user(username)
# Verify hash from database matches entered password
if user and check_password_hash(user[2], password):
session['user_id'] = user[0]
session['username'] = user[1]
return redirect(url_for('dashboard'))
else:
return render_template('login.html', error="Invalid username or password")
@app.route('/dashboard')
def dashboard():
"""User project list."""
if 'user_id' not in session:
return redirect(url_for('home'))
return render_template('dashboard.html', username=session['username'])
@app.route('/canvas')
def canvas():
"""The main EDA editor."""
if 'user_id' not in session:
return redirect(url_for('home'))
# Note: Ensure your old index.html is renamed to canvas.html in the frontend folder
return render_template('canvas.html')
@app.route('/logout')
def logout():
"""Clear session and return to login."""
session.clear()
return redirect(url_for('home'))
# --- API ROUTES (Library & Components) ---
@app.route('/api/library')
def getLib():
"""Get library structure."""
# tree = buildTree(YML_PATH)
if os.path.isdir(COMPS_ROOT):
compMap = findComps(COMPS_ROOT)
fresh_tree = addCompsToTree(compMap)
return jsonify(fresh_tree)
@app.route('/api/component/<component_name>')
def getComp(component_name):
"""Return component YAML data."""
data = readCompYaml(component_name)
if data is None:
return jsonify({"error": "Component not found"}), 404
return jsonify(data)
@app.route('/api/component/<component_name>/image')
def getCompImg(component_name):
"""Return first image in component folder."""
for root, dirs, files in os.walk(COMPS_ROOT):
if os.path.basename(root) == component_name:
dirs.clear()
for ext in ('.png', '.jpg', '.jpeg', '.svg'):
for f in files:
if f.lower().endswith(ext):
return send_from_directory(root, f)
break
return jsonify({"error": "No image found"}), 404
if __name__ == '__main__':
print("Starting mxpic EDA Server on http://127.0.0.1:3000")
app.run(host='127.0.0.1', port=3000, debug=True)
+41
View File
@@ -0,0 +1,41 @@
# -----------------------------------------------------------------------------
# Description: Technology manifest loading and fallback helpers for routing layers and cross-section defaults.
# Inside functions: technology_manifest_path, read_technology_manifest
# Developer : Qin Yue @ 2026
# Organization : OptiHK Limited
# -----------------------------------------------------------------------------
import os
import yaml
class TechnologyManifestError(Exception):
"""Exception raised when a technology manifest cannot be found or parsed."""
pass
def technology_manifest_path(pdks_root: str, foundry: str, technology: str) -> str:
"""Build the expected path to a foundry/technology manifest YAML file."""
base = os.path.abspath(pdks_root)
path = os.path.abspath(os.path.join(base, foundry, technology, "technology.yml"))
# Keep user-provided foundry/technology names from escaping the configured
# PDK root.
if not path.startswith(base + os.sep):
raise TechnologyManifestError("Invalid technology path")
return path
def read_technology_manifest(pdks_root: str, foundry: str, technology: str) -> dict:
"""Load and validate the active technology manifest for the frontend."""
path = technology_manifest_path(pdks_root, foundry, technology)
if not os.path.exists(path):
raise TechnologyManifestError("technology manifest not generated; run mxpic_forge technology export workflow")
with open(path, "r", encoding="utf-8") as file:
manifest = yaml.safe_load(file) or {}
# The frontend route editor depends on both xsection definitions and global
# defaults being present before it can safely style or build links.
if not isinstance(manifest.get("xsections"), dict):
raise TechnologyManifestError("technology manifest is missing xsections")
if not isinstance(manifest.get("defaults"), dict):
raise TechnologyManifestError("technology manifest is missing defaults")
return manifest