DecisionTree/admin.py

421 lines
15 KiB
Python

from flask import Blueprint, render_template, request, redirect, url_for, flash, jsonify
from flask_login import login_required, current_user
from models import db, Product, Node, NodeImage, User, SiteConfig
from functools import wraps
import json
import os
from werkzeug.utils import secure_filename
admin_bp = Blueprint('admin', __name__)
# Add these configurations to your app
UPLOAD_FOLDER = 'static/uploads'
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated or not current_user.is_admin:
flash('You need admin privileges to access this page.', 'danger')
return redirect(url_for('auth.login'))
return f(*args, **kwargs)
return decorated_function
@admin_bp.route('/')
@login_required
@admin_required
def admin_dashboard():
products = Product.query.all()
return render_template('admin/admin_dashboard.html', products=products)
@admin_bp.route('/create_product', methods=['GET', 'POST'])
@login_required
@admin_required
def create_product():
if request.method == 'POST':
title = request.form.get('title')
thumbnail = request.form.get('thumbnail')
description = request.form.get('description', '')
# Get the root question and content from the form
root_question = request.form.get('root_question', 'How can we help you?')
root_content = request.form.get('root_content', 'Welcome to the decision tree.')
# Create a default root node for the decision tree
root_node = Node(question=root_question, content=root_content)
db.session.add(root_node)
db.session.commit()
# Create the product with the root node
product = Product(
title=title,
thumbnail=thumbnail,
description=description,
root_node_id=root_node.id
)
db.session.add(product)
db.session.commit()
flash("Product created successfully! Now you can build the decision tree.", "success")
return redirect(url_for('admin.edit_decision_tree', product_id=product.id))
return render_template('admin/create_product.html')
@admin_bp.route('/edit_product/<int:product_id>', methods=['GET', 'POST'])
@login_required
@admin_required
def edit_product(product_id):
product = Product.query.get_or_404(product_id)
if request.method == 'POST':
product.title = request.form.get('title')
product.thumbnail = request.form.get('thumbnail')
product.description = request.form.get('description', '')
db.session.commit()
flash("Product updated successfully!", "success")
return redirect(url_for('admin.admin_dashboard'))
return render_template('admin/edit_product.html', product=product)
@admin_bp.route('/delete_product/<int:product_id>')
@login_required
@admin_required
def delete_product(product_id):
product = Product.query.get_or_404(product_id)
# Delete the root node and all associated nodes (this would need a more sophisticated
# implementation to delete the entire tree)
if product.root_node:
db.session.delete(product.root_node)
db.session.delete(product)
db.session.commit()
flash("Product deleted successfully!", "success")
return redirect(url_for('admin.admin_dashboard'))
@admin_bp.route('/edit_decision_tree/<int:product_id>')
@login_required
@admin_required
def edit_decision_tree(product_id):
product = Product.query.get_or_404(product_id)
# Generate tree data for visualization
tree_data = generate_tree_data(product.root_node, current_node_id=None)
tree_json = json.dumps(tree_data)
return render_template('admin/edit_decision_tree.html', product=product, tree_json=tree_json)
@admin_bp.route('/node/<int:node_id>', methods=['GET', 'POST'])
@login_required
@admin_required
def edit_node(node_id):
node = Node.query.get_or_404(node_id)
# Find which product this node belongs to
product = find_product_for_node(node)
if request.method == 'POST':
node.question = request.form.get('question')
node.content = request.form.get('content')
node.is_terminal = 'is_terminal' in request.form
# Handle YouTube URL
youtube_url = request.form.get('youtube_url')
if youtube_url:
node.youtube_url = youtube_url
elif node.youtube_url and not youtube_url:
# Clear the YouTube URL if the field is empty
node.youtube_url = None
# Handle image upload
if 'image_upload' in request.files:
file = request.files['image_upload']
if file and file.filename and allowed_file(file.filename):
# Create upload directory if it doesn't exist
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
# Generate a secure filename with node ID to avoid conflicts
filename = secure_filename(f"node_{node.id}_{file.filename}")
filepath = os.path.join(UPLOAD_FOLDER, filename)
# Save the file
file.save(filepath)
# Create a new NodeImage
caption = request.form.get('image_caption', '')
# Get the highest display order
max_order = db.session.query(db.func.max(NodeImage.display_order)).filter(NodeImage.node_id == node.id).scalar() or 0
# Create new image with next order
new_image = NodeImage(
node_id=node.id,
image_url=f"/{filepath}",
caption=caption,
display_order=max_order + 1
)
db.session.add(new_image)
# Handle image removal
for key in request.form:
if key.startswith('remove_image_'):
try:
image_id = int(key.split('_')[-1])
image = NodeImage.query.get(image_id)
if image and image.node_id == node.id:
# Delete the file if it exists
if image.image_url and os.path.exists(image.image_url.lstrip('/')):
os.remove(image.image_url.lstrip('/'))
# Delete the database record
db.session.delete(image)
except (ValueError, AttributeError):
pass
# Handle image reordering
for key in request.form:
if key.startswith('image_order_'):
try:
image_id = int(key.split('_')[-1])
new_order = int(request.form[key])
image = NodeImage.query.get(image_id)
if image and image.node_id == node.id:
image.display_order = new_order
except (ValueError, AttributeError):
pass
db.session.commit()
flash("Node updated successfully!", "success")
return redirect(url_for('admin.edit_node', node_id=node.id))
# Generate tree data for visualization
if product:
tree_data = generate_tree_data(product.root_node, current_node_id=node.id)
tree_json = json.dumps(tree_data)
else:
tree_json = '{}'
return render_template('admin/edit_node.html', node=node, product=product, tree_json=tree_json)
# Helper function to find which product a node belongs to
def find_product_for_node(node):
# First check if this is a root node
product = Product.query.filter_by(root_node_id=node.id).first()
if not product:
# Try to find the product by traversing up the tree
for p in Product.query.all():
root_node = p.root_node
if root_node:
# Check if node is in the tree
if is_node_in_tree(root_node, node.id):
product = p
break
return product
# Helper function to check if a node is in a tree
def is_node_in_tree(root, node_id):
if not root:
return False
if root.id == node_id:
return True
# Check yes branch
if root.yes_node_id and is_node_in_tree(root.yes_node, node_id):
return True
# Check no branch
if root.no_node_id and is_node_in_tree(root.no_node, node_id):
return True
return False
# Helper function to generate tree data for D3.js visualization
def generate_tree_data(node, current_node_id=None):
if not node:
return None
# Create the node data
node_data = {
'name': node.question[:30] + ('...' if len(node.question) > 30 else ''),
'id': node.id,
'current': node.id == current_node_id,
'type': 'root', # Default type
'is_terminal': node.is_terminal, # Make sure this property is included
'children': []
}
# Add yes branch
if node.yes_node:
yes_data = generate_tree_data(node.yes_node, current_node_id)
yes_data['type'] = 'yes'
node_data['children'].append(yes_data)
# Add no branch
if node.no_node:
no_data = generate_tree_data(node.no_node, current_node_id)
no_data['type'] = 'no'
node_data['children'].append(no_data)
return node_data
@admin_bp.route('/add_child_node/<int:parent_id>/<string:direction>', methods=['POST'])
@login_required
@admin_required
def add_child_node(parent_id, direction):
parent = Node.query.get_or_404(parent_id)
# Create a new node
new_node = Node(
question=request.form.get('question', 'New Question'),
content=request.form.get('content', 'New Content')
)
db.session.add(new_node)
db.session.commit()
# Link the new node to the parent
if direction == 'yes':
parent.yes_node_id = new_node.id
else:
parent.no_node_id = new_node.id
db.session.commit()
# Find which product this node belongs to
product = None
# First check if this is a root node
product = Product.query.filter_by(root_node_id=parent.id).first()
if not product:
# Try to find the product by traversing up the tree
# This is a simplified approach and might not work for complex trees
for p in Product.query.all():
root_node = p.root_node
if root_node:
# Check if parent is in the yes branch
current = root_node
while current and current.yes_node_id:
if current.yes_node_id == parent.id:
product = p
break
current = current.yes_node
# Check if parent is in the no branch
current = root_node
while current and current.no_node_id:
if current.no_node_id == parent.id:
product = p
break
current = current.no_node
flash("Node added successfully!", "success")
# Redirect to edit the new node instead of going back to the dashboard
return redirect(url_for('admin.edit_node', node_id=new_node.id))
# Helper function to check if file extension is allowed
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@admin_bp.route('/run_migrations')
@login_required
@admin_required
def run_migrations():
"""Run database migrations"""
try:
# Check if columns already exist using the new SQLAlchemy 2.0 API
with db.engine.connect() as conn:
result = conn.execute(db.text("PRAGMA table_info(node)"))
columns = result.fetchall()
column_names = [col[1] for col in columns]
# Add image_url column if it doesn't exist
if 'image_url' not in column_names:
conn.execute(db.text("ALTER TABLE node ADD COLUMN image_url VARCHAR(500)"))
flash("Added image_url column to Node table", "success")
# Add youtube_url column if it doesn't exist
if 'youtube_url' not in column_names:
conn.execute(db.text("ALTER TABLE node ADD COLUMN youtube_url VARCHAR(500)"))
flash("Added youtube_url column to Node table", "success")
conn.commit()
flash("Database migration completed successfully", "success")
except Exception as e:
flash(f"Error during migration: {str(e)}", "danger")
return redirect(url_for('admin.admin_dashboard'))
@admin_bp.route('/users')
@login_required
@admin_required
def manage_users():
users = User.query.all()
registration_enabled = SiteConfig.get_setting('registration_enabled', 'true') == 'true'
return render_template('admin/manage_users.html', users=users, registration_enabled=registration_enabled)
@admin_bp.route('/users/toggle_registration')
@login_required
@admin_required
def toggle_registration():
current_setting = SiteConfig.get_setting('registration_enabled', 'true')
new_setting = 'false' if current_setting == 'true' else 'true'
SiteConfig.set_setting('registration_enabled', new_setting, 'Controls whether user registration is enabled')
status = "enabled" if new_setting == 'true' else "disabled"
flash(f"User registration has been {status}", "success")
return redirect(url_for('admin.manage_users'))
@admin_bp.route('/users/<int:user_id>/toggle_admin')
@login_required
@admin_required
def toggle_admin(user_id):
user = User.query.get_or_404(user_id)
# Prevent removing admin status from the last admin
if user.is_admin and User.query.filter_by(is_admin=True).count() <= 1:
flash("Cannot remove admin status from the last admin user", "danger")
return redirect(url_for('admin.manage_users'))
# Don't allow changing your own admin status
if user.id == current_user.id:
flash("You cannot change your own admin status", "danger")
return redirect(url_for('admin.manage_users'))
user.is_admin = not user.is_admin
db.session.commit()
status = "granted" if user.is_admin else "revoked"
flash(f"Admin privileges {status} for {user.username}", "success")
return redirect(url_for('admin.manage_users'))
@admin_bp.route('/users/<int:user_id>/delete', methods=['POST'])
@login_required
@admin_required
def delete_user(user_id):
user = User.query.get_or_404(user_id)
# Don't allow deleting yourself
if user.id == current_user.id:
flash("You cannot delete your own account", "danger")
return redirect(url_for('admin.manage_users'))
# Don't allow deleting the last admin
if user.is_admin and User.query.filter_by(is_admin=True).count() <= 1:
flash("Cannot delete the last admin user", "danger")
return redirect(url_for('admin.manage_users'))
username = user.username
db.session.delete(user)
db.session.commit()
flash(f"User '{username}' has been deleted", "success")
return redirect(url_for('admin.manage_users'))