from flask import Blueprint, render_template, request, url_for, flash, redirect, send_from_directory, make_response, current_app from database import get_owp_db from flask_autoindex import AutoIndex import os import sqlite3 import time def get_owp_db_conn(): conn = sqlite3.connect('/var/open-ww3-project-ww3-tw/databases/sqlite/owp.db') conn.row_factory = sqlite3.Row return conn def get_message_db_conn(): conn = sqlite3.connect('/var/open-ww3-project-ww3-tw/databases/sqlite/message.db') conn.row_factory = sqlite3.Row return conn blog_bp = Blueprint('blog', __name__) index_path = '/var/open-ww3-project-ww3-tw/mirrors/' index = AutoIndex(blog_bp, browse_root=index_path, add_url_rules=False) @blog_bp.route('/') def home(): db = get_owp_db() conn = get_owp_db_conn() sql_logs = "SELECT * from logs;" sql_posts = "SELECT * from posts where status= 1;" logs = conn.execute(sql_logs).fetchall() posts = conn.execute(sql_posts).fetchall() conn.close() return render_template('blog/home.html', logs=logs[::-1], posts=posts[::-1]) @blog_bp.route('/about/') def about(): return render_template('blog/about.html') pass @blog_bp.route('/posts/') def posts_list(): conn = get_owp_db_conn() sql_posts = "SELECT * from posts where status= 1;" posts = conn.execute(sql_posts).fetchall() conn.close() return render_template('blog/list.html', posts=posts[::-1]) @blog_bp.route('/posts//') def show_posts_id(posts_id): conn = get_owp_db_conn() conn_message = get_message_db_conn() sql_posts = "SELECT * FROM posts WHERE status = 1 AND id = ?" posts = conn.execute(sql_posts, (posts_id,)).fetchone() sql_message = "SELECT * FROM messages WHERE posts_id = ?" message = conn_message.execute(sql_message, (posts_id,)).fetchall() conn.close() conn_message.close() if posts is None: return f'未找到该文章{posts_id}未找到该文章{posts_id}', 404 return render_template('blog/posts.html', posts=posts, message=message[::-1]) # 数据库测试 @blog_bp.route('/db_test/') def test_db(): db = get_owp_db() try: cur = db.execute('SELECT sqlite_version();') ver = cur.fetchone()[0] return f"数据库连接成功!SQLite 版本是: {ver}" except Exception as e: return f"连接失败: {str(e)}" ###### ## 网站地图 及其 robots 配置 @blog_bp.route('/robots.txt/') def robots(): return send_from_directory(current_app.static_folder, 'robots.txt') @blog_bp.route('/google02f6a3f6004a32c6.html') def google02f6a3f6004a32c6(): return send_from_directory(blog_bp.static_folder, 'google02f6a3f6004a32c6.html') @blog_bp.route('/sitemap.xml/') def sitemap(): conn = get_owp_db_conn() sql_posts = "SELECT * from posts where status = 1;" posts = conn.execute(sql_posts).fetchall() conn.close() template = render_template('blog/sitemap.xml', posts=posts[::-1], base_url="https://open-ww3-project.ww3.tw/blog/") response = make_response(template) response.headers['Content-Type'] = 'application/xml' return response @blog_bp.route('/kami/') def kami(): return redirect(url_for('kami.home')) @blog_bp.route('/mirrors/') @blog_bp.route('/mirrors/') def autoindex(path='.'): return index.render_autoindex(path, template='blog/mirrors.html') @blog_bp.route('/upload/', methods=['POST' , 'GET']) def upload(): if request.method == "POST": f = request.files.get('img') filename = f.filename with open(f'/var/open-ww3-project-ww3-tw/static/upload/img/{filename}', 'wb') as tf: tf.write(f.read()) return '上传成功' img_files = '/var/open-ww3-project-ww3-tw/static/upload/img/' images = [img for img in os.listdir(img_files) if img.endswith(('.png', '.jpg', '.jpeg', '.gif'))] return render_template('blog/upload.html', images=images) @blog_bp.route('/message/') def messages(): return render_template('message/home.html')