421 lines
15 KiB
Python
421 lines
15 KiB
Python
from flask import Blueprint, render_template, request, redirect, url_for, flash, jsonify
|
|
from flask_login import login_required, current_user
|
|
from models import db, Product, Node, NodeImage, User, SiteConfig
|
|
from functools import wraps
|
|
import json
|
|
import os
|
|
from werkzeug.utils import secure_filename
|
|
|
|
admin_bp = Blueprint('admin', __name__)
|
|
|
|
# Add these configurations to your app
|
|
UPLOAD_FOLDER = 'static/uploads'
|
|
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
|
|
|
|
def admin_required(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if not current_user.is_authenticated or not current_user.is_admin:
|
|
flash('You need admin privileges to access this page.', 'danger')
|
|
return redirect(url_for('auth.login'))
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
@admin_bp.route('/')
|
|
@login_required
|
|
@admin_required
|
|
def admin_dashboard():
|
|
products = Product.query.all()
|
|
return render_template('admin/admin_dashboard.html', products=products)
|
|
|
|
@admin_bp.route('/create_product', methods=['GET', 'POST'])
|
|
@login_required
|
|
@admin_required
|
|
def create_product():
|
|
if request.method == 'POST':
|
|
title = request.form.get('title')
|
|
thumbnail = request.form.get('thumbnail')
|
|
description = request.form.get('description', '')
|
|
|
|
# Get the root question and content from the form
|
|
root_question = request.form.get('root_question', 'How can we help you?')
|
|
root_content = request.form.get('root_content', 'Welcome to the decision tree.')
|
|
|
|
# Create a default root node for the decision tree
|
|
root_node = Node(question=root_question, content=root_content)
|
|
db.session.add(root_node)
|
|
db.session.commit()
|
|
|
|
# Create the product with the root node
|
|
product = Product(
|
|
title=title,
|
|
thumbnail=thumbnail,
|
|
description=description,
|
|
root_node_id=root_node.id
|
|
)
|
|
db.session.add(product)
|
|
db.session.commit()
|
|
|
|
flash("Product created successfully! Now you can build the decision tree.", "success")
|
|
return redirect(url_for('admin.edit_decision_tree', product_id=product.id))
|
|
|
|
return render_template('admin/create_product.html')
|
|
|
|
@admin_bp.route('/edit_product/<int:product_id>', methods=['GET', 'POST'])
|
|
@login_required
|
|
@admin_required
|
|
def edit_product(product_id):
|
|
product = Product.query.get_or_404(product_id)
|
|
|
|
if request.method == 'POST':
|
|
product.title = request.form.get('title')
|
|
product.thumbnail = request.form.get('thumbnail')
|
|
product.description = request.form.get('description', '')
|
|
|
|
db.session.commit()
|
|
flash("Product updated successfully!", "success")
|
|
return redirect(url_for('admin.admin_dashboard'))
|
|
|
|
return render_template('admin/edit_product.html', product=product)
|
|
|
|
@admin_bp.route('/delete_product/<int:product_id>')
|
|
@login_required
|
|
@admin_required
|
|
def delete_product(product_id):
|
|
product = Product.query.get_or_404(product_id)
|
|
|
|
# Delete the root node and all associated nodes (this would need a more sophisticated
|
|
# implementation to delete the entire tree)
|
|
if product.root_node:
|
|
db.session.delete(product.root_node)
|
|
|
|
db.session.delete(product)
|
|
db.session.commit()
|
|
|
|
flash("Product deleted successfully!", "success")
|
|
return redirect(url_for('admin.admin_dashboard'))
|
|
|
|
@admin_bp.route('/edit_decision_tree/<int:product_id>')
|
|
@login_required
|
|
@admin_required
|
|
def edit_decision_tree(product_id):
|
|
product = Product.query.get_or_404(product_id)
|
|
|
|
# Generate tree data for visualization
|
|
tree_data = generate_tree_data(product.root_node, current_node_id=None)
|
|
tree_json = json.dumps(tree_data)
|
|
|
|
return render_template('admin/edit_decision_tree.html', product=product, tree_json=tree_json)
|
|
|
|
@admin_bp.route('/node/<int:node_id>', methods=['GET', 'POST'])
|
|
@login_required
|
|
@admin_required
|
|
def edit_node(node_id):
|
|
node = Node.query.get_or_404(node_id)
|
|
|
|
# Find which product this node belongs to
|
|
product = find_product_for_node(node)
|
|
|
|
if request.method == 'POST':
|
|
node.question = request.form.get('question')
|
|
node.content = request.form.get('content')
|
|
node.is_terminal = 'is_terminal' in request.form
|
|
|
|
# Handle YouTube URL
|
|
youtube_url = request.form.get('youtube_url')
|
|
if youtube_url:
|
|
node.youtube_url = youtube_url
|
|
elif node.youtube_url and not youtube_url:
|
|
# Clear the YouTube URL if the field is empty
|
|
node.youtube_url = None
|
|
|
|
# Handle image upload
|
|
if 'image_upload' in request.files:
|
|
file = request.files['image_upload']
|
|
if file and file.filename and allowed_file(file.filename):
|
|
# Create upload directory if it doesn't exist
|
|
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
|
|
|
|
# Generate a secure filename with node ID to avoid conflicts
|
|
filename = secure_filename(f"node_{node.id}_{file.filename}")
|
|
filepath = os.path.join(UPLOAD_FOLDER, filename)
|
|
|
|
# Save the file
|
|
file.save(filepath)
|
|
|
|
# Create a new NodeImage
|
|
caption = request.form.get('image_caption', '')
|
|
|
|
# Get the highest display order
|
|
max_order = db.session.query(db.func.max(NodeImage.display_order)).filter(NodeImage.node_id == node.id).scalar() or 0
|
|
|
|
# Create new image with next order
|
|
new_image = NodeImage(
|
|
node_id=node.id,
|
|
image_url=f"/{filepath}",
|
|
caption=caption,
|
|
display_order=max_order + 1
|
|
)
|
|
db.session.add(new_image)
|
|
|
|
# Handle image removal
|
|
for key in request.form:
|
|
if key.startswith('remove_image_'):
|
|
try:
|
|
image_id = int(key.split('_')[-1])
|
|
image = NodeImage.query.get(image_id)
|
|
if image and image.node_id == node.id:
|
|
# Delete the file if it exists
|
|
if image.image_url and os.path.exists(image.image_url.lstrip('/')):
|
|
os.remove(image.image_url.lstrip('/'))
|
|
|
|
# Delete the database record
|
|
db.session.delete(image)
|
|
except (ValueError, AttributeError):
|
|
pass
|
|
|
|
# Handle image reordering
|
|
for key in request.form:
|
|
if key.startswith('image_order_'):
|
|
try:
|
|
image_id = int(key.split('_')[-1])
|
|
new_order = int(request.form[key])
|
|
image = NodeImage.query.get(image_id)
|
|
if image and image.node_id == node.id:
|
|
image.display_order = new_order
|
|
except (ValueError, AttributeError):
|
|
pass
|
|
|
|
db.session.commit()
|
|
flash("Node updated successfully!", "success")
|
|
|
|
return redirect(url_for('admin.edit_node', node_id=node.id))
|
|
|
|
# Generate tree data for visualization
|
|
if product:
|
|
tree_data = generate_tree_data(product.root_node, current_node_id=node.id)
|
|
tree_json = json.dumps(tree_data)
|
|
else:
|
|
tree_json = '{}'
|
|
|
|
return render_template('admin/edit_node.html', node=node, product=product, tree_json=tree_json)
|
|
|
|
# Helper function to find which product a node belongs to
|
|
def find_product_for_node(node):
|
|
# First check if this is a root node
|
|
product = Product.query.filter_by(root_node_id=node.id).first()
|
|
|
|
if not product:
|
|
# Try to find the product by traversing up the tree
|
|
for p in Product.query.all():
|
|
root_node = p.root_node
|
|
if root_node:
|
|
# Check if node is in the tree
|
|
if is_node_in_tree(root_node, node.id):
|
|
product = p
|
|
break
|
|
|
|
return product
|
|
|
|
# Helper function to check if a node is in a tree
|
|
def is_node_in_tree(root, node_id):
|
|
if not root:
|
|
return False
|
|
|
|
if root.id == node_id:
|
|
return True
|
|
|
|
# Check yes branch
|
|
if root.yes_node_id and is_node_in_tree(root.yes_node, node_id):
|
|
return True
|
|
|
|
# Check no branch
|
|
if root.no_node_id and is_node_in_tree(root.no_node, node_id):
|
|
return True
|
|
|
|
return False
|
|
|
|
# Helper function to generate tree data for D3.js visualization
|
|
def generate_tree_data(node, current_node_id=None):
|
|
if not node:
|
|
return None
|
|
|
|
# Create the node data
|
|
node_data = {
|
|
'name': node.question[:30] + ('...' if len(node.question) > 30 else ''),
|
|
'id': node.id,
|
|
'current': node.id == current_node_id,
|
|
'type': 'root', # Default type
|
|
'is_terminal': node.is_terminal, # Make sure this property is included
|
|
'children': []
|
|
}
|
|
|
|
# Add yes branch
|
|
if node.yes_node:
|
|
yes_data = generate_tree_data(node.yes_node, current_node_id)
|
|
yes_data['type'] = 'yes'
|
|
node_data['children'].append(yes_data)
|
|
|
|
# Add no branch
|
|
if node.no_node:
|
|
no_data = generate_tree_data(node.no_node, current_node_id)
|
|
no_data['type'] = 'no'
|
|
node_data['children'].append(no_data)
|
|
|
|
return node_data
|
|
|
|
@admin_bp.route('/add_child_node/<int:parent_id>/<string:direction>', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def add_child_node(parent_id, direction):
|
|
parent = Node.query.get_or_404(parent_id)
|
|
|
|
# Create a new node
|
|
new_node = Node(
|
|
question=request.form.get('question', 'New Question'),
|
|
content=request.form.get('content', 'New Content')
|
|
)
|
|
db.session.add(new_node)
|
|
db.session.commit()
|
|
|
|
# Link the new node to the parent
|
|
if direction == 'yes':
|
|
parent.yes_node_id = new_node.id
|
|
else:
|
|
parent.no_node_id = new_node.id
|
|
|
|
db.session.commit()
|
|
|
|
# Find which product this node belongs to
|
|
product = None
|
|
# First check if this is a root node
|
|
product = Product.query.filter_by(root_node_id=parent.id).first()
|
|
|
|
if not product:
|
|
# Try to find the product by traversing up the tree
|
|
# This is a simplified approach and might not work for complex trees
|
|
for p in Product.query.all():
|
|
root_node = p.root_node
|
|
if root_node:
|
|
# Check if parent is in the yes branch
|
|
current = root_node
|
|
while current and current.yes_node_id:
|
|
if current.yes_node_id == parent.id:
|
|
product = p
|
|
break
|
|
current = current.yes_node
|
|
|
|
# Check if parent is in the no branch
|
|
current = root_node
|
|
while current and current.no_node_id:
|
|
if current.no_node_id == parent.id:
|
|
product = p
|
|
break
|
|
current = current.no_node
|
|
|
|
flash("Node added successfully!", "success")
|
|
|
|
# Redirect to edit the new node instead of going back to the dashboard
|
|
return redirect(url_for('admin.edit_node', node_id=new_node.id))
|
|
|
|
# Helper function to check if file extension is allowed
|
|
def allowed_file(filename):
|
|
return '.' in filename and \
|
|
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
|
|
|
|
@admin_bp.route('/run_migrations')
|
|
@login_required
|
|
@admin_required
|
|
def run_migrations():
|
|
"""Run database migrations"""
|
|
try:
|
|
# Check if columns already exist using the new SQLAlchemy 2.0 API
|
|
with db.engine.connect() as conn:
|
|
result = conn.execute(db.text("PRAGMA table_info(node)"))
|
|
columns = result.fetchall()
|
|
column_names = [col[1] for col in columns]
|
|
|
|
# Add image_url column if it doesn't exist
|
|
if 'image_url' not in column_names:
|
|
conn.execute(db.text("ALTER TABLE node ADD COLUMN image_url VARCHAR(500)"))
|
|
flash("Added image_url column to Node table", "success")
|
|
|
|
# Add youtube_url column if it doesn't exist
|
|
if 'youtube_url' not in column_names:
|
|
conn.execute(db.text("ALTER TABLE node ADD COLUMN youtube_url VARCHAR(500)"))
|
|
flash("Added youtube_url column to Node table", "success")
|
|
|
|
conn.commit()
|
|
|
|
flash("Database migration completed successfully", "success")
|
|
except Exception as e:
|
|
flash(f"Error during migration: {str(e)}", "danger")
|
|
|
|
return redirect(url_for('admin.admin_dashboard'))
|
|
|
|
@admin_bp.route('/users')
|
|
@login_required
|
|
@admin_required
|
|
def manage_users():
|
|
users = User.query.all()
|
|
registration_enabled = SiteConfig.get_setting('registration_enabled', 'true') == 'true'
|
|
return render_template('admin/manage_users.html', users=users, registration_enabled=registration_enabled)
|
|
|
|
@admin_bp.route('/users/toggle_registration')
|
|
@login_required
|
|
@admin_required
|
|
def toggle_registration():
|
|
current_setting = SiteConfig.get_setting('registration_enabled', 'true')
|
|
new_setting = 'false' if current_setting == 'true' else 'true'
|
|
SiteConfig.set_setting('registration_enabled', new_setting, 'Controls whether user registration is enabled')
|
|
|
|
status = "enabled" if new_setting == 'true' else "disabled"
|
|
flash(f"User registration has been {status}", "success")
|
|
return redirect(url_for('admin.manage_users'))
|
|
|
|
@admin_bp.route('/users/<int:user_id>/toggle_admin')
|
|
@login_required
|
|
@admin_required
|
|
def toggle_admin(user_id):
|
|
user = User.query.get_or_404(user_id)
|
|
|
|
# Prevent removing admin status from the last admin
|
|
if user.is_admin and User.query.filter_by(is_admin=True).count() <= 1:
|
|
flash("Cannot remove admin status from the last admin user", "danger")
|
|
return redirect(url_for('admin.manage_users'))
|
|
|
|
# Don't allow changing your own admin status
|
|
if user.id == current_user.id:
|
|
flash("You cannot change your own admin status", "danger")
|
|
return redirect(url_for('admin.manage_users'))
|
|
|
|
user.is_admin = not user.is_admin
|
|
db.session.commit()
|
|
|
|
status = "granted" if user.is_admin else "revoked"
|
|
flash(f"Admin privileges {status} for {user.username}", "success")
|
|
return redirect(url_for('admin.manage_users'))
|
|
|
|
@admin_bp.route('/users/<int:user_id>/delete', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def delete_user(user_id):
|
|
user = User.query.get_or_404(user_id)
|
|
|
|
# Don't allow deleting yourself
|
|
if user.id == current_user.id:
|
|
flash("You cannot delete your own account", "danger")
|
|
return redirect(url_for('admin.manage_users'))
|
|
|
|
# Don't allow deleting the last admin
|
|
if user.is_admin and User.query.filter_by(is_admin=True).count() <= 1:
|
|
flash("Cannot delete the last admin user", "danger")
|
|
return redirect(url_for('admin.manage_users'))
|
|
|
|
username = user.username
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
|
|
flash(f"User '{username}' has been deleted", "success")
|
|
return redirect(url_for('admin.manage_users'))
|