585 lines
20 KiB
Python
585 lines
20 KiB
Python
|
|
import os
|
|
import re
|
|
import shutil
|
|
import json
|
|
import yaml
|
|
from collections import OrderedDict
|
|
from functools import wraps
|
|
from flask import Flask, jsonify, send_from_directory, request, redirect, url_for, session, render_template, make_response
|
|
from werkzeug.security import check_password_hash
|
|
import database
|
|
from flask import Response
|
|
|
|
# --- Path Configurations ---
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
FRONTEND_DIR = os.path.join(BASE_DIR, '..', 'frontend')
|
|
|
|
YML_PATH = os.path.join(BASE_DIR, '..', 'mxpic', 'PDKs', 'Silterra', 'directories.yaml')
|
|
COMPS_ROOT = os.path.join(BASE_DIR, '..', 'mxpic', 'PDKs', 'Silterra')
|
|
# Define where your new icons folder is located (adjust if it's placed elsewhere)
|
|
ICONS_DIR = os.path.join(BASE_DIR, 'icons')
|
|
|
|
#build layout save path
|
|
DATABASE_ROOT = os.path.abspath(os.path.join(BASE_DIR, '..', 'database'))
|
|
|
|
|
|
app = Flask(__name__, template_folder=FRONTEND_DIR, static_folder=FRONTEND_DIR)
|
|
app.secret_key = os.environ.get('MXPIC_SECRET_KEY', 'change_me_for_intranet_deployment')
|
|
app.config.update(
|
|
SESSION_COOKIE_HTTPONLY=True,
|
|
SESSION_COOKIE_SAMESITE='Lax',
|
|
SESSION_COOKIE_SECURE=os.environ.get('MXPIC_COOKIE_SECURE', '0').lower() in {'1', 'true', 'yes'},
|
|
)
|
|
app.json.sort_keys = False
|
|
|
|
database.init_db()
|
|
|
|
|
|
def no_cache_response(response):
|
|
"""Prevent stale editor assets while canvas features are being revised."""
|
|
response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0'
|
|
response.headers['Pragma'] = 'no-cache'
|
|
response.headers['Expires'] = '0'
|
|
return response
|
|
|
|
|
|
def login_required_json(view_func):
|
|
@wraps(view_func)
|
|
def wrapper(*args, **kwargs):
|
|
if 'user_id' not in session:
|
|
return jsonify({"error": "Authentication required"}), 401
|
|
return view_func(*args, **kwargs)
|
|
return wrapper
|
|
|
|
|
|
def request_ip():
|
|
forwarded_for = request.headers.get('X-Forwarded-For', '')
|
|
if forwarded_for:
|
|
return forwarded_for.split(',')[0].strip()
|
|
return request.remote_addr
|
|
|
|
|
|
def record_action(action, project=None, cell=None, detail=None):
|
|
if 'user_id' not in session:
|
|
return
|
|
if isinstance(detail, (dict, list)):
|
|
detail = json.dumps(detail, ensure_ascii=False)
|
|
elif detail is not None:
|
|
detail = str(detail)
|
|
database.add_user_log(
|
|
session.get('user_id'),
|
|
session.get('username', 'unknown'),
|
|
action,
|
|
project=safe_name(project, '') if project else None,
|
|
cell=safe_name(cell, '') if cell else None,
|
|
detail=detail,
|
|
ip_address=request_ip()
|
|
)
|
|
|
|
|
|
def safe_name(value, fallback):
|
|
"""Keep user/project/cell names filesystem-friendly without changing display names."""
|
|
value = (value or '').strip()
|
|
if not value:
|
|
value = fallback
|
|
value = re.sub(r'[^A-Za-z0-9_.-]+', '_', value)
|
|
value = value.strip('._')
|
|
return value or fallback
|
|
|
|
|
|
def user_layout_root():
|
|
username = safe_name(session.get('username'), 'anonymous')
|
|
return os.path.join(DATABASE_ROOT, username, 'layout')
|
|
|
|
|
|
def project_root(project_name):
|
|
return os.path.join(user_layout_root(), safe_name(project_name, 'project_1'))
|
|
|
|
|
|
def cell_file_path(project_name, cell_name):
|
|
return os.path.join(project_root(project_name), f"{safe_name(cell_name, 'canvas_1')}.yml")
|
|
|
|
|
|
def project_meta_path(project_name):
|
|
return os.path.join(project_root(project_name), ".project.json")
|
|
|
|
|
|
def read_project_meta(project_name):
|
|
path = project_meta_path(project_name)
|
|
if not os.path.exists(path):
|
|
return {}
|
|
with open(path, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
|
|
|
|
def write_project_meta(project_name, meta):
|
|
os.makedirs(project_root(project_name), exist_ok=True)
|
|
with open(project_meta_path(project_name), 'w', encoding='utf-8') as f:
|
|
json.dump(meta, f, indent=2)
|
|
|
|
# ... [Keep countSpaces and buildTree exactly as they are] ...
|
|
|
|
def findComps(baseDir):
|
|
"""Scan component folders, return map of paths -> component info."""
|
|
compMap = {}
|
|
refDir = baseDir
|
|
for root, dirs, files in os.walk(baseDir):
|
|
ymlFiles = [f for f in files if f.endswith('.yml')]
|
|
if ymlFiles:
|
|
parentDir = os.path.dirname(root)
|
|
relPath = os.path.relpath(parentDir, refDir)
|
|
parts = () if relPath == '.' else tuple(relPath.split(os.sep))
|
|
compName = os.path.basename(root)
|
|
|
|
# Extract the category (the mother folder's name)
|
|
category = os.path.basename(parentDir)
|
|
|
|
# Include compName in the key so multiple cells in one category do not overwrite each other.
|
|
compMap[parts + (compName,)] = {
|
|
'folder': compName,
|
|
'yml': ymlFiles[0],
|
|
'category': category # Save the category to the map
|
|
}
|
|
dirs.clear()
|
|
return compMap
|
|
|
|
def addCompsToTree(compMap):
|
|
"""Build a completely fresh tree from scratch and insert component nodes."""
|
|
fresh_tree = OrderedDict()
|
|
|
|
for mapKey, compItem in compMap.items():
|
|
pathSeg = mapKey[:-1]
|
|
compName = compItem['folder']
|
|
curNode = fresh_tree
|
|
|
|
for seg in pathSeg:
|
|
if seg not in curNode:
|
|
curNode[seg] = OrderedDict()
|
|
curNode = curNode[seg]
|
|
|
|
curNode[compName] = OrderedDict({
|
|
"__type__": "component",
|
|
"__name__": compName,
|
|
"__yml__": compItem['yml'],
|
|
"__category__": compItem['category'] # Inject category into the tree
|
|
})
|
|
|
|
return fresh_tree
|
|
|
|
# ... [Keep readCompYaml and Page Routes exactly as they are] ...
|
|
|
|
# --- API ROUTES (Library, Components & Icons) ---
|
|
|
|
@app.route('/api/icon/<category>')
|
|
def getIcon(category):
|
|
"""Serve the icon corresponding to the component category."""
|
|
for ext in ('.png', '.svg', '.jpg'):
|
|
icon_path = os.path.join(ICONS_DIR, f"{category}{ext}")
|
|
if os.path.exists(icon_path):
|
|
return send_from_directory(ICONS_DIR, f"{category}{ext}")
|
|
|
|
fallback = os.path.join(ICONS_DIR, "default.png")
|
|
if os.path.exists(fallback):
|
|
return send_from_directory(ICONS_DIR, "default.png")
|
|
|
|
# return png if not found
|
|
transparent_png = (
|
|
b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01'
|
|
b'\x08\x06\x00\x00\x00\x1f\x15\xc4\x89\x00\x00\x00\nIDATx\x9cc\x00\x01'
|
|
b'\x00\x00\x05\x00\x01\r\n\xf4\xc0\x00\x00\x00\x00IEND\xaeB`\x82'
|
|
)
|
|
return Response(transparent_png, mimetype='image/png')
|
|
|
|
# ... [Keep existing API routes below] ...
|
|
|
|
def readCompYaml(compName):
|
|
"""Load YAML from component folder."""
|
|
for root, dirs, files in os.walk(COMPS_ROOT):
|
|
if os.path.basename(root) == compName:
|
|
dirs.clear()
|
|
ymlFiles = [f for f in files if f.endswith('.yml')]
|
|
if ymlFiles:
|
|
ymlPath = os.path.join(root, ymlFiles[0])
|
|
with open(ymlPath, 'r', encoding='utf-8') as f:
|
|
return yaml.safe_load(f)
|
|
return None
|
|
|
|
# --- AUTHENTICATION & PAGE ROUTES ---
|
|
@app.route('/')
|
|
def home():
|
|
"""Route to login page, or bypass to dashboard if already authenticated."""
|
|
if 'user_id' in session:
|
|
return redirect(url_for('dashboard'))
|
|
return render_template('login.html')
|
|
|
|
@app.route('/login', methods=['POST'])
|
|
def login():
|
|
"""Verify credentials against the database."""
|
|
username = request.form.get('username')
|
|
password = request.form.get('password')
|
|
|
|
user = database.get_user(username)
|
|
|
|
# Verify hash from database matches entered password
|
|
if user and check_password_hash(user[2], password):
|
|
session['user_id'] = user[0]
|
|
session['username'] = user[1]
|
|
record_action('login')
|
|
return redirect(url_for('dashboard'))
|
|
else:
|
|
return render_template('login.html', error="Invalid username or password")
|
|
|
|
@app.route('/dashboard')
|
|
def dashboard():
|
|
"""User project list."""
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('home'))
|
|
|
|
return render_template('dashboard.html', username=session['username'])
|
|
|
|
@app.route('/canvas')
|
|
def canvas():
|
|
"""The main EDA editor."""
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('home'))
|
|
|
|
# Note: Ensure your old index.html is renamed to canvas.html in the frontend folder
|
|
return no_cache_response(make_response(render_template('canvas.html')))
|
|
|
|
|
|
@app.route('/canvas-helpers.js')
|
|
def canvas_helpers():
|
|
"""Serve the shared canvas helper script used by canvas.html."""
|
|
return no_cache_response(send_from_directory(FRONTEND_DIR, 'canvas-helpers.js'))
|
|
|
|
@app.route('/logout')
|
|
def logout():
|
|
"""Clear session and return to login."""
|
|
record_action('logout')
|
|
session.clear()
|
|
return redirect(url_for('home'))
|
|
|
|
|
|
@app.route('/api/health')
|
|
def health_check():
|
|
return jsonify({"status": "ok", "service": "mxpic_eda"})
|
|
|
|
|
|
@app.route('/api/technologies', methods=['GET'])
|
|
@login_required_json
|
|
def list_technologies():
|
|
"""List technology choices from mxpic/PDKs/<foundry>/<technology>."""
|
|
technologies = []
|
|
pdks_root = os.path.join(BASE_DIR, '..', 'mxpic', 'PDKs')
|
|
if os.path.isdir(pdks_root):
|
|
for foundry in sorted(os.listdir(pdks_root)):
|
|
foundry_path = os.path.join(pdks_root, foundry)
|
|
if not os.path.isdir(foundry_path):
|
|
continue
|
|
for technology in sorted(os.listdir(foundry_path)):
|
|
technology_path = os.path.join(foundry_path, technology)
|
|
if not os.path.isdir(technology_path):
|
|
continue
|
|
technologies.append({
|
|
"foundry": foundry,
|
|
"technology": technology,
|
|
"id": f"{foundry}/{technology}",
|
|
"label": f"{foundry} / {technology}"
|
|
})
|
|
return jsonify({"technologies": technologies})
|
|
|
|
|
|
@app.route('/api/profile', methods=['GET', 'PATCH'])
|
|
@login_required_json
|
|
def account_profile():
|
|
occupations = {'intern', 'senior engineer', 'junior engineer', 'principle engineer'}
|
|
user_id = session.get('user_id')
|
|
|
|
if request.method == 'PATCH':
|
|
data = request.get_json(silent=True) or {}
|
|
occupation = (data.get('occupation') or '').strip().lower()
|
|
if occupation not in occupations:
|
|
return jsonify({"error": "Invalid occupation"}), 400
|
|
database.update_user_occupation(user_id, occupation)
|
|
record_action('profile.update_occupation', detail={"occupation": occupation})
|
|
|
|
profile = database.get_user_profile(user_id)
|
|
if not profile:
|
|
return jsonify({"error": "Profile not found"}), 404
|
|
|
|
return jsonify({
|
|
"id": f"mx-{int(profile[0]):06d}",
|
|
"username": profile[1],
|
|
"created_at": profile[2],
|
|
"credits": profile[3] or 0,
|
|
"occupation": profile[4] or "intern",
|
|
"occupations": sorted(occupations)
|
|
})
|
|
|
|
|
|
@app.route('/api/profile/password', methods=['POST'])
|
|
@login_required_json
|
|
def change_password():
|
|
data = request.get_json(silent=True) or {}
|
|
current_password = data.get('current_password') or ''
|
|
new_password = data.get('new_password') or ''
|
|
if len(new_password) < 6:
|
|
return jsonify({"error": "New password must be at least 6 characters"}), 400
|
|
|
|
user = database.get_user_auth_by_id(session.get('user_id'))
|
|
if not user or not check_password_hash(user[2], current_password):
|
|
return jsonify({"error": "Current password is incorrect"}), 400
|
|
|
|
database.update_user_password(user[0], new_password)
|
|
record_action('profile.change_password')
|
|
return jsonify({"message": "Password updated"})
|
|
|
|
|
|
@app.route('/api/logs', methods=['GET', 'POST'])
|
|
@login_required_json
|
|
def user_logs():
|
|
if request.method == 'POST':
|
|
data = request.get_json(silent=True) or {}
|
|
action = safe_name(data.get('action'), '')
|
|
if not action:
|
|
return jsonify({"error": "Action is required"}), 400
|
|
record_action(
|
|
action,
|
|
project=data.get('project'),
|
|
cell=data.get('cell'),
|
|
detail=data.get('detail')
|
|
)
|
|
return jsonify({"message": "logged"})
|
|
|
|
try:
|
|
limit = min(500, max(1, int(request.args.get('limit', 200))))
|
|
except ValueError:
|
|
limit = 200
|
|
rows = database.list_user_logs(session.get('user_id'), limit=limit)
|
|
return jsonify({
|
|
"logs": [
|
|
{
|
|
"id": row[0],
|
|
"action": row[1],
|
|
"project": row[2],
|
|
"cell": row[3],
|
|
"detail": row[4],
|
|
"ip_address": row[5],
|
|
"created_at": row[6]
|
|
}
|
|
for row in rows
|
|
]
|
|
})
|
|
|
|
|
|
@app.route('/api/projects', methods=['GET'])
|
|
@login_required_json
|
|
def list_projects():
|
|
"""List projects stored under database/<username>/layout."""
|
|
root = user_layout_root()
|
|
os.makedirs(root, exist_ok=True)
|
|
|
|
projects = []
|
|
for name in sorted(os.listdir(root)):
|
|
path = os.path.join(root, name)
|
|
if not os.path.isdir(path):
|
|
continue
|
|
cells = []
|
|
for filename in sorted(os.listdir(path)):
|
|
if not filename.lower().endswith(('.yml', '.yaml')):
|
|
continue
|
|
cell_name = os.path.splitext(filename)[0]
|
|
yml_path = os.path.join(path, filename)
|
|
cells.append({
|
|
"name": cell_name,
|
|
"has_layout": os.path.exists(yml_path)
|
|
})
|
|
meta = read_project_meta(name)
|
|
projects.append({
|
|
"name": name,
|
|
"cells": cells,
|
|
"technology": meta.get("technology")
|
|
})
|
|
|
|
return jsonify({"projects": projects})
|
|
|
|
|
|
@app.route('/api/projects', methods=['POST'])
|
|
@login_required_json
|
|
def create_project():
|
|
data = request.get_json(silent=True) or {}
|
|
requested_name = safe_name(data.get('name'), 'project_1')
|
|
technology = data.get('technology') or ''
|
|
root = user_layout_root()
|
|
os.makedirs(root, exist_ok=True)
|
|
|
|
project_name = requested_name
|
|
counter = 1
|
|
while os.path.exists(os.path.join(root, project_name)):
|
|
counter += 1
|
|
project_name = f"{requested_name}_{counter}"
|
|
|
|
os.makedirs(project_root(project_name), exist_ok=True)
|
|
write_project_meta(project_name, {
|
|
"name": project_name,
|
|
"technology": technology
|
|
})
|
|
record_action('project.create', project=project_name, detail={"technology": technology})
|
|
return jsonify({"name": project_name, "technology": technology}), 201
|
|
|
|
|
|
@app.route('/api/projects/<project_name>', methods=['GET'])
|
|
@login_required_json
|
|
def get_project(project_name):
|
|
"""Load all saved cells for a project."""
|
|
root = project_root(project_name)
|
|
if not os.path.isdir(root):
|
|
return jsonify({"error": "Project not found"}), 404
|
|
|
|
cells = []
|
|
for filename in sorted(os.listdir(root)):
|
|
if not filename.lower().endswith(('.yml', '.yaml')):
|
|
continue
|
|
cell_name = os.path.splitext(filename)[0]
|
|
yml_path = os.path.join(root, filename)
|
|
if not os.path.exists(yml_path):
|
|
continue
|
|
with open(yml_path, 'r', encoding='utf-8') as f:
|
|
cells.append({
|
|
"name": cell_name,
|
|
"content": f.read()
|
|
})
|
|
|
|
return jsonify({
|
|
"name": safe_name(project_name, 'project_1'),
|
|
"cells": cells,
|
|
"technology": read_project_meta(project_name).get("technology")
|
|
})
|
|
|
|
|
|
@app.route('/api/projects/<project_name>', methods=['DELETE'])
|
|
@login_required_json
|
|
def delete_project(project_name):
|
|
"""Delete a user's project folder under database/<username>/layout."""
|
|
root = project_root(project_name)
|
|
layout_root = os.path.abspath(user_layout_root())
|
|
target = os.path.abspath(root)
|
|
|
|
if not target.startswith(layout_root + os.sep):
|
|
return jsonify({"error": "Invalid project path"}), 400
|
|
if not os.path.isdir(target):
|
|
return jsonify({"error": "Project not found"}), 404
|
|
|
|
shutil.rmtree(target)
|
|
record_action('project.delete', project=project_name)
|
|
return jsonify({"message": "deleted", "project": safe_name(project_name, 'project_1')})
|
|
|
|
|
|
@app.route('/api/projects/<project_name>/cells/<cell_name>', methods=['PATCH', 'DELETE'])
|
|
@login_required_json
|
|
def rename_cell(project_name, cell_name):
|
|
if request.method == 'DELETE':
|
|
cell = safe_name(cell_name, 'canvas_1')
|
|
target = os.path.abspath(cell_file_path(project_name, cell))
|
|
project_dir = os.path.abspath(project_root(project_name))
|
|
if not target.startswith(project_dir + os.sep):
|
|
return jsonify({"error": "Invalid cell path"}), 400
|
|
if not os.path.exists(target):
|
|
record_action('canvas.delete_missing', project=project_name, cell=cell)
|
|
return jsonify({"message": "already deleted", "cell": cell})
|
|
os.remove(target)
|
|
record_action('canvas.delete', project=project_name, cell=cell)
|
|
return jsonify({"message": "deleted", "cell": cell})
|
|
|
|
data = request.get_json(silent=True) or {}
|
|
old_cell = safe_name(cell_name, 'canvas_1')
|
|
new_cell = safe_name(data.get('name'), old_cell)
|
|
if old_cell == new_cell:
|
|
return jsonify({"message": "unchanged", "cell": new_cell})
|
|
|
|
old_path = cell_file_path(project_name, old_cell)
|
|
new_path = cell_file_path(project_name, new_cell)
|
|
if os.path.exists(new_path):
|
|
return jsonify({"error": "Cell name already exists"}), 409
|
|
if os.path.exists(old_path):
|
|
os.rename(old_path, new_path)
|
|
|
|
record_action('canvas.rename', project=project_name, cell=new_cell, detail={"old_cell": old_cell})
|
|
return jsonify({"message": "renamed", "old_cell": old_cell, "cell": new_cell})
|
|
|
|
|
|
|
|
|
|
@app.route('/api/save-layout', methods=['POST'])
|
|
@login_required_json
|
|
def save_layout():
|
|
try:
|
|
data = request.get_json()
|
|
project = safe_name(data.get('project'), 'project_1')
|
|
cell = safe_name(data.get('cell'), 'canvas_1')
|
|
content = data.get('content', '')
|
|
|
|
save_path = cell_file_path(project, cell)
|
|
os.makedirs(os.path.dirname(save_path), exist_ok=True)
|
|
|
|
with open(save_path, 'w', encoding='utf-8') as f:
|
|
f.write(content)
|
|
|
|
record_action('layout.save', project=project, cell=cell, detail={"bytes": len(content)})
|
|
return jsonify({
|
|
"message": "successfully saved",
|
|
"project": project,
|
|
"cell": cell,
|
|
"path": save_path
|
|
}), 200
|
|
|
|
except Exception as e:
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
|
|
# --- API ROUTES (Library & Components) ---
|
|
@app.route('/api/library')
|
|
def getLib():
|
|
"""Get library structure."""
|
|
# tree = buildTree(YML_PATH)
|
|
if os.path.isdir(COMPS_ROOT):
|
|
compMap = findComps(COMPS_ROOT)
|
|
fresh_tree = addCompsToTree(compMap)
|
|
return jsonify(fresh_tree)
|
|
|
|
|
|
|
|
@app.route('/api/component/<component_name>')
|
|
def getComp(component_name):
|
|
"""Return component YAML data."""
|
|
data = readCompYaml(component_name)
|
|
if data is None:
|
|
return jsonify({"error": "Component not found"}), 404
|
|
return jsonify(data)
|
|
|
|
@app.route('/api/component/<component_name>/image')
|
|
def getCompImg(component_name):
|
|
"""Return first image in component folder."""
|
|
for root, dirs, files in os.walk(COMPS_ROOT):
|
|
if os.path.basename(root) == component_name:
|
|
dirs.clear()
|
|
for ext in ('.png', '.jpg', '.jpeg', '.svg'):
|
|
for f in files:
|
|
if f.lower().endswith(ext):
|
|
return send_from_directory(root, f)
|
|
break
|
|
return jsonify({"error": "No image found"}), 404
|
|
|
|
if __name__ == '__main__':
|
|
host = os.environ.get('MXPIC_HOST', '0.0.0.0')
|
|
port = int(os.environ.get('MXPIC_PORT', '3000'))
|
|
debug = os.environ.get('MXPIC_DEBUG', '0').lower() in {'1', 'true', 'yes'}
|
|
print(f"Starting mxpic EDA Server on http://{host}:{port}")
|
|
app.run(host=host, port=port, debug=debug, threaded=True)
|
|
|
|
|
|
|
|
|