Files
mxpic_EDA/backend/server.py
T

571 lines
19 KiB
Python

import os
import re
import shutil
import json
import yaml
from collections import OrderedDict
from functools import wraps
from flask import Flask, jsonify, send_from_directory, request, redirect, url_for, session, render_template
from werkzeug.security import check_password_hash
import database
from flask import Response
# --- Path Configurations ---
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
FRONTEND_DIR = os.path.join(BASE_DIR, '..', 'frontend')
YML_PATH = os.path.join(BASE_DIR, '..', 'mxpic', 'PDKs', 'Silterra', 'directories.yaml')
COMPS_ROOT = os.path.join(BASE_DIR, '..', 'mxpic', 'PDKs', 'Silterra')
# Define where your new icons folder is located (adjust if it's placed elsewhere)
ICONS_DIR = os.path.join(BASE_DIR, 'icons')
#build layout save path
DATABASE_ROOT = os.path.abspath(os.path.join(BASE_DIR, '..', 'database'))
app = Flask(__name__, template_folder=FRONTEND_DIR, static_folder=FRONTEND_DIR)
app.secret_key = os.environ.get('MXPIC_SECRET_KEY', 'change_me_for_intranet_deployment')
app.config.update(
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_SAMESITE='Lax',
SESSION_COOKIE_SECURE=os.environ.get('MXPIC_COOKIE_SECURE', '0').lower() in {'1', 'true', 'yes'},
)
app.json.sort_keys = False
database.init_db()
def login_required_json(view_func):
@wraps(view_func)
def wrapper(*args, **kwargs):
if 'user_id' not in session:
return jsonify({"error": "Authentication required"}), 401
return view_func(*args, **kwargs)
return wrapper
def request_ip():
forwarded_for = request.headers.get('X-Forwarded-For', '')
if forwarded_for:
return forwarded_for.split(',')[0].strip()
return request.remote_addr
def record_action(action, project=None, cell=None, detail=None):
if 'user_id' not in session:
return
if isinstance(detail, (dict, list)):
detail = json.dumps(detail, ensure_ascii=False)
elif detail is not None:
detail = str(detail)
database.add_user_log(
session.get('user_id'),
session.get('username', 'unknown'),
action,
project=safe_name(project, '') if project else None,
cell=safe_name(cell, '') if cell else None,
detail=detail,
ip_address=request_ip()
)
def safe_name(value, fallback):
"""Keep user/project/cell names filesystem-friendly without changing display names."""
value = (value or '').strip()
if not value:
value = fallback
value = re.sub(r'[^A-Za-z0-9_.-]+', '_', value)
value = value.strip('._')
return value or fallback
def user_layout_root():
username = safe_name(session.get('username'), 'anonymous')
return os.path.join(DATABASE_ROOT, username, 'layout')
def project_root(project_name):
return os.path.join(user_layout_root(), safe_name(project_name, 'project_1'))
def cell_file_path(project_name, cell_name):
return os.path.join(project_root(project_name), f"{safe_name(cell_name, 'canvas_1')}.yml")
def project_meta_path(project_name):
return os.path.join(project_root(project_name), ".project.json")
def read_project_meta(project_name):
path = project_meta_path(project_name)
if not os.path.exists(path):
return {}
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
def write_project_meta(project_name, meta):
os.makedirs(project_root(project_name), exist_ok=True)
with open(project_meta_path(project_name), 'w', encoding='utf-8') as f:
json.dump(meta, f, indent=2)
# ... [Keep countSpaces and buildTree exactly as they are] ...
def findComps(baseDir):
"""Scan component folders, return map of paths -> component info."""
compMap = {}
refDir = baseDir
for root, dirs, files in os.walk(baseDir):
ymlFiles = [f for f in files if f.endswith('.yml')]
if ymlFiles:
parentDir = os.path.dirname(root)
relPath = os.path.relpath(parentDir, refDir)
parts = () if relPath == '.' else tuple(relPath.split(os.sep))
compName = os.path.basename(root)
# Extract the category (the mother folder's name)
category = os.path.basename(parentDir)
# Include compName in the key so multiple cells in one category do not overwrite each other.
compMap[parts + (compName,)] = {
'folder': compName,
'yml': ymlFiles[0],
'category': category # Save the category to the map
}
dirs.clear()
return compMap
def addCompsToTree(compMap):
"""Build a completely fresh tree from scratch and insert component nodes."""
fresh_tree = OrderedDict()
for mapKey, compItem in compMap.items():
pathSeg = mapKey[:-1]
compName = compItem['folder']
curNode = fresh_tree
for seg in pathSeg:
if seg not in curNode:
curNode[seg] = OrderedDict()
curNode = curNode[seg]
curNode[compName] = OrderedDict({
"__type__": "component",
"__name__": compName,
"__yml__": compItem['yml'],
"__category__": compItem['category'] # Inject category into the tree
})
return fresh_tree
# ... [Keep readCompYaml and Page Routes exactly as they are] ...
# --- API ROUTES (Library, Components & Icons) ---
@app.route('/api/icon/<category>')
def getIcon(category):
"""Serve the icon corresponding to the component category."""
for ext in ('.png', '.svg', '.jpg'):
icon_path = os.path.join(ICONS_DIR, f"{category}{ext}")
if os.path.exists(icon_path):
return send_from_directory(ICONS_DIR, f"{category}{ext}")
fallback = os.path.join(ICONS_DIR, "default.png")
if os.path.exists(fallback):
return send_from_directory(ICONS_DIR, "default.png")
# return png if not found
transparent_png = (
b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01'
b'\x08\x06\x00\x00\x00\x1f\x15\xc4\x89\x00\x00\x00\nIDATx\x9cc\x00\x01'
b'\x00\x00\x05\x00\x01\r\n\xf4\xc0\x00\x00\x00\x00IEND\xaeB`\x82'
)
return Response(transparent_png, mimetype='image/png')
# ... [Keep existing API routes below] ...
def readCompYaml(compName):
"""Load YAML from component folder."""
for root, dirs, files in os.walk(COMPS_ROOT):
if os.path.basename(root) == compName:
dirs.clear()
ymlFiles = [f for f in files if f.endswith('.yml')]
if ymlFiles:
ymlPath = os.path.join(root, ymlFiles[0])
with open(ymlPath, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
return None
# --- AUTHENTICATION & PAGE ROUTES ---
@app.route('/')
def home():
"""Route to login page, or bypass to dashboard if already authenticated."""
if 'user_id' in session:
return redirect(url_for('dashboard'))
return render_template('login.html')
@app.route('/login', methods=['POST'])
def login():
"""Verify credentials against the database."""
username = request.form.get('username')
password = request.form.get('password')
user = database.get_user(username)
# Verify hash from database matches entered password
if user and check_password_hash(user[2], password):
session['user_id'] = user[0]
session['username'] = user[1]
record_action('login')
return redirect(url_for('dashboard'))
else:
return render_template('login.html', error="Invalid username or password")
@app.route('/dashboard')
def dashboard():
"""User project list."""
if 'user_id' not in session:
return redirect(url_for('home'))
return render_template('dashboard.html', username=session['username'])
@app.route('/canvas')
def canvas():
"""The main EDA editor."""
if 'user_id' not in session:
return redirect(url_for('home'))
# Note: Ensure your old index.html is renamed to canvas.html in the frontend folder
return render_template('canvas.html')
@app.route('/logout')
def logout():
"""Clear session and return to login."""
record_action('logout')
session.clear()
return redirect(url_for('home'))
@app.route('/api/health')
def health_check():
return jsonify({"status": "ok", "service": "mxpic_eda"})
@app.route('/api/technologies', methods=['GET'])
@login_required_json
def list_technologies():
"""List technology choices from mxpic/PDKs/<foundry>/<technology>."""
technologies = []
pdks_root = os.path.join(BASE_DIR, '..', 'mxpic', 'PDKs')
if os.path.isdir(pdks_root):
for foundry in sorted(os.listdir(pdks_root)):
foundry_path = os.path.join(pdks_root, foundry)
if not os.path.isdir(foundry_path):
continue
for technology in sorted(os.listdir(foundry_path)):
technology_path = os.path.join(foundry_path, technology)
if not os.path.isdir(technology_path):
continue
technologies.append({
"foundry": foundry,
"technology": technology,
"id": f"{foundry}/{technology}",
"label": f"{foundry} / {technology}"
})
return jsonify({"technologies": technologies})
@app.route('/api/profile', methods=['GET', 'PATCH'])
@login_required_json
def account_profile():
occupations = {'intern', 'senior engineer', 'junior engineer', 'principle engineer'}
user_id = session.get('user_id')
if request.method == 'PATCH':
data = request.get_json(silent=True) or {}
occupation = (data.get('occupation') or '').strip().lower()
if occupation not in occupations:
return jsonify({"error": "Invalid occupation"}), 400
database.update_user_occupation(user_id, occupation)
record_action('profile.update_occupation', detail={"occupation": occupation})
profile = database.get_user_profile(user_id)
if not profile:
return jsonify({"error": "Profile not found"}), 404
return jsonify({
"id": f"mx-{int(profile[0]):06d}",
"username": profile[1],
"created_at": profile[2],
"credits": profile[3] or 0,
"occupation": profile[4] or "intern",
"occupations": sorted(occupations)
})
@app.route('/api/profile/password', methods=['POST'])
@login_required_json
def change_password():
data = request.get_json(silent=True) or {}
current_password = data.get('current_password') or ''
new_password = data.get('new_password') or ''
if len(new_password) < 6:
return jsonify({"error": "New password must be at least 6 characters"}), 400
user = database.get_user_auth_by_id(session.get('user_id'))
if not user or not check_password_hash(user[2], current_password):
return jsonify({"error": "Current password is incorrect"}), 400
database.update_user_password(user[0], new_password)
record_action('profile.change_password')
return jsonify({"message": "Password updated"})
@app.route('/api/logs', methods=['GET', 'POST'])
@login_required_json
def user_logs():
if request.method == 'POST':
data = request.get_json(silent=True) or {}
action = safe_name(data.get('action'), '')
if not action:
return jsonify({"error": "Action is required"}), 400
record_action(
action,
project=data.get('project'),
cell=data.get('cell'),
detail=data.get('detail')
)
return jsonify({"message": "logged"})
try:
limit = min(500, max(1, int(request.args.get('limit', 200))))
except ValueError:
limit = 200
rows = database.list_user_logs(session.get('user_id'), limit=limit)
return jsonify({
"logs": [
{
"id": row[0],
"action": row[1],
"project": row[2],
"cell": row[3],
"detail": row[4],
"ip_address": row[5],
"created_at": row[6]
}
for row in rows
]
})
@app.route('/api/projects', methods=['GET'])
@login_required_json
def list_projects():
"""List projects stored under database/<username>/layout."""
root = user_layout_root()
os.makedirs(root, exist_ok=True)
projects = []
for name in sorted(os.listdir(root)):
path = os.path.join(root, name)
if not os.path.isdir(path):
continue
cells = []
for filename in sorted(os.listdir(path)):
if not filename.lower().endswith(('.yml', '.yaml')):
continue
cell_name = os.path.splitext(filename)[0]
yml_path = os.path.join(path, filename)
cells.append({
"name": cell_name,
"has_layout": os.path.exists(yml_path)
})
meta = read_project_meta(name)
projects.append({
"name": name,
"cells": cells,
"technology": meta.get("technology")
})
return jsonify({"projects": projects})
@app.route('/api/projects', methods=['POST'])
@login_required_json
def create_project():
data = request.get_json(silent=True) or {}
requested_name = safe_name(data.get('name'), 'project_1')
technology = data.get('technology') or ''
root = user_layout_root()
os.makedirs(root, exist_ok=True)
project_name = requested_name
counter = 1
while os.path.exists(os.path.join(root, project_name)):
counter += 1
project_name = f"{requested_name}_{counter}"
os.makedirs(project_root(project_name), exist_ok=True)
write_project_meta(project_name, {
"name": project_name,
"technology": technology
})
record_action('project.create', project=project_name, detail={"technology": technology})
return jsonify({"name": project_name, "technology": technology}), 201
@app.route('/api/projects/<project_name>', methods=['GET'])
@login_required_json
def get_project(project_name):
"""Load all saved cells for a project."""
root = project_root(project_name)
if not os.path.isdir(root):
return jsonify({"error": "Project not found"}), 404
cells = []
for filename in sorted(os.listdir(root)):
if not filename.lower().endswith(('.yml', '.yaml')):
continue
cell_name = os.path.splitext(filename)[0]
yml_path = os.path.join(root, filename)
if not os.path.exists(yml_path):
continue
with open(yml_path, 'r', encoding='utf-8') as f:
cells.append({
"name": cell_name,
"content": f.read()
})
return jsonify({
"name": safe_name(project_name, 'project_1'),
"cells": cells,
"technology": read_project_meta(project_name).get("technology")
})
@app.route('/api/projects/<project_name>', methods=['DELETE'])
@login_required_json
def delete_project(project_name):
"""Delete a user's project folder under database/<username>/layout."""
root = project_root(project_name)
layout_root = os.path.abspath(user_layout_root())
target = os.path.abspath(root)
if not target.startswith(layout_root + os.sep):
return jsonify({"error": "Invalid project path"}), 400
if not os.path.isdir(target):
return jsonify({"error": "Project not found"}), 404
shutil.rmtree(target)
record_action('project.delete', project=project_name)
return jsonify({"message": "deleted", "project": safe_name(project_name, 'project_1')})
@app.route('/api/projects/<project_name>/cells/<cell_name>', methods=['PATCH', 'DELETE'])
@login_required_json
def rename_cell(project_name, cell_name):
if request.method == 'DELETE':
cell = safe_name(cell_name, 'canvas_1')
target = os.path.abspath(cell_file_path(project_name, cell))
project_dir = os.path.abspath(project_root(project_name))
if not target.startswith(project_dir + os.sep):
return jsonify({"error": "Invalid cell path"}), 400
if not os.path.exists(target):
record_action('canvas.delete_missing', project=project_name, cell=cell)
return jsonify({"message": "already deleted", "cell": cell})
os.remove(target)
record_action('canvas.delete', project=project_name, cell=cell)
return jsonify({"message": "deleted", "cell": cell})
data = request.get_json(silent=True) or {}
old_cell = safe_name(cell_name, 'canvas_1')
new_cell = safe_name(data.get('name'), old_cell)
if old_cell == new_cell:
return jsonify({"message": "unchanged", "cell": new_cell})
old_path = cell_file_path(project_name, old_cell)
new_path = cell_file_path(project_name, new_cell)
if os.path.exists(new_path):
return jsonify({"error": "Cell name already exists"}), 409
if os.path.exists(old_path):
os.rename(old_path, new_path)
record_action('canvas.rename', project=project_name, cell=new_cell, detail={"old_cell": old_cell})
return jsonify({"message": "renamed", "old_cell": old_cell, "cell": new_cell})
@app.route('/api/save-layout', methods=['POST'])
@login_required_json
def save_layout():
try:
data = request.get_json()
project = safe_name(data.get('project'), 'project_1')
cell = safe_name(data.get('cell'), 'canvas_1')
content = data.get('content', '')
save_path = cell_file_path(project, cell)
os.makedirs(os.path.dirname(save_path), exist_ok=True)
with open(save_path, 'w', encoding='utf-8') as f:
f.write(content)
record_action('layout.save', project=project, cell=cell, detail={"bytes": len(content)})
return jsonify({
"message": "successfully saved",
"project": project,
"cell": cell,
"path": save_path
}), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
# --- API ROUTES (Library & Components) ---
@app.route('/api/library')
def getLib():
"""Get library structure."""
# tree = buildTree(YML_PATH)
if os.path.isdir(COMPS_ROOT):
compMap = findComps(COMPS_ROOT)
fresh_tree = addCompsToTree(compMap)
return jsonify(fresh_tree)
@app.route('/api/component/<component_name>')
def getComp(component_name):
"""Return component YAML data."""
data = readCompYaml(component_name)
if data is None:
return jsonify({"error": "Component not found"}), 404
return jsonify(data)
@app.route('/api/component/<component_name>/image')
def getCompImg(component_name):
"""Return first image in component folder."""
for root, dirs, files in os.walk(COMPS_ROOT):
if os.path.basename(root) == component_name:
dirs.clear()
for ext in ('.png', '.jpg', '.jpeg', '.svg'):
for f in files:
if f.lower().endswith(ext):
return send_from_directory(root, f)
break
return jsonify({"error": "No image found"}), 404
if __name__ == '__main__':
host = os.environ.get('MXPIC_HOST', '0.0.0.0')
port = int(os.environ.get('MXPIC_PORT', '3000'))
debug = os.environ.get('MXPIC_DEBUG', '0').lower() in {'1', 'true', 'yes'}
print(f"Starting mxpic EDA Server on http://{host}:{port}")
app.run(host=host, port=port, debug=debug, threaded=True)