from flask import Blueprint, render_template, request, url_for, flash, redirect, send_from_directory, make_response, current_app from database import get_owp_db from flask_autoindex import AutoIndex from werkzeug.utils import secure_filename from datetime import datetime import os import sqlite3 import time import json import shutil def get_owp_db_conn(): conn = sqlite3.connect('/var/open-ww3-project-ww3-tw/databases/sqlite/owp.db') conn.row_factory = sqlite3.Row return conn def get_message_db_conn(): conn = sqlite3.connect('/var/open-ww3-project-ww3-tw/databases/sqlite/message.db') conn.row_factory = sqlite3.Row return conn blog_bp = Blueprint('blog', __name__) index_path = '/var/open-ww3-project-ww3-tw/mirrors/' index = AutoIndex(blog_bp, browse_root=index_path, add_url_rules=False) # 重定向 @blog_bp.route('/mirrors/re/') def re(): return('404 not found
这是你不该知道的地方
') @blog_bp.route('/mirrors/paint/2024.2.13/临摹出来的乐色/不好说的人物/') @blog_bp.route('/mirrors/paint/2024.2.13/临摹出来的乐色/不好说的人物/') def no_say_man(subpath=None): return redirect(url_for('blog.re')) @blog_bp.route('/mirrors/game/') @blog_bp.route('/mirrors/game/') def no_game(subpath=None): return redirect(url_for('blog.re')) # 重定向 @blog_bp.route('/') def home(): db = get_owp_db() conn = get_owp_db_conn() sql_logs = "SELECT * from logs;" sql_posts = "SELECT * from posts where status= 1;" logs = conn.execute(sql_logs).fetchall() posts = conn.execute(sql_posts).fetchall() conn.close() count_file = '/var/open-ww3-project-ww3-tw/static/counter.txt' def load_couter(): if os.path.exists(count_file): with open(count_file, 'r') as f: return int(f.read().strip()) return 0 def save_couter(couter): with open(count_file, 'w') as f: f.write(str(couter)) counter = load_couter() counter += 1 save_couter(counter) return render_template('blog/home.html', logs=logs[::-1], posts=posts[::-1], counter=counter) @blog_bp.route('/about/') def about(): return render_template('blog/about.html') pass @blog_bp.route('/posts/') def posts_list(): conn = get_owp_db_conn() sql_posts = "SELECT * from posts where status= 1;" posts = conn.execute(sql_posts).fetchall() conn.close() return render_template('blog/list.html', posts=posts[::-1]) @blog_bp.route('/posts//') def show_posts_id(posts_id): message_path = f"/var/open-ww3-project-ww3-tw/static/message/{posts_id}" if not os.path.exists(message_path): os.makedirs(message_path) message_json = f"{message_path}/chat.json" if not os.path.exists(message_json): with open(message_json, 'w', encoding='utf-8') as file: json.dump([], file, ensure_ascii=False) message_ss = json.load(open(message_json, 'r', encoding='utf-8')) conn = get_owp_db_conn() conn_message = get_message_db_conn() sql_posts = "SELECT * FROM posts WHERE status = 1 AND id = ?" posts = conn.execute(sql_posts, (posts_id,)).fetchone() sql_message = "SELECT * FROM messages WHERE posts_id = ?" message = conn_message.execute(sql_message, (posts_id,)).fetchall() conn.close() conn_message.close() if posts is None: shutil.rmtree(message_path) return f'未找到该文章{posts_id}未找到该文章{posts_id}/', 404 return render_template('blog/posts.html', posts=posts, message=message[::-1], message_ss=message_ss[::-1]) # posts 留言 @blog_bp.route('/posts//chat/', methods=['POST', 'GET']) def posts_chat(posts_id): message_path = f"/var/open-ww3-project-ww3-tw/static/message/{posts_id}/chat.json" if request.method == "POST": name = request.form.get("name", None) data = request.form.get("data", None) date = datetime.now().strftime("%Y-%m-%d %H:%M:%S") json_push = { "name": name, "data": data, "date": date } with open(message_path, 'r', encoding='utf-8') as f: old_date_data = json.load(f) old_date_data.append(json_push) with open(message_path, 'w', encoding='utf-8') as f: json.dump(old_date_data, f, ensure_ascii=False, indent=4) return "留言成功
返回文章" return "GET" # 数据库测试 @blog_bp.route('/db_test/') def test_db(): db = get_owp_db() try: cur = db.execute('SELECT sqlite_version();') ver = cur.fetchone()[0] return f"数据库连接成功!SQLite 版本是: {ver}" except Exception as e: return f"连接失败: {str(e)}" ###### ## 网站地图 及其 robots 配置 @blog_bp.route('/robots.txt/') def robots(): return send_from_directory(current_app.static_folder, 'robots.txt') @blog_bp.route('/google02f6a3f6004a32c6.html') def google02f6a3f6004a32c6(): return send_from_directory(blog_bp.static_folder, 'google02f6a3f6004a32c6.html') @blog_bp.route('/sitemap.xml/') def sitemap(): conn = get_owp_db_conn() sql_posts = "SELECT * from posts where status = 1;" posts = conn.execute(sql_posts).fetchall() conn.close() template = render_template('blog/sitemap.xml', posts=posts[::-1], base_url="https://open-ww3-project.ww3.tw/blog/") response = make_response(template) response.headers['Content-Type'] = 'application/xml' return response @blog_bp.route('/kami/') def kami(): return redirect(url_for('kami.home')) @blog_bp.route('/mirrors/') @blog_bp.route('/mirrors/') def autoindex(path='.'): return index.render_autoindex(path, template='blog/mirrors.html') @blog_bp.route('/upload/', methods=['POST' , 'GET']) def upload(): if request.method == "POST": f = request.files.get('img') filename = f.filename with open(f'/var/open-ww3-project-ww3-tw/static/upload/img/{filename}', 'wb') as tf: tf.write(f.read()) return '上传成功' img_files = '/var/open-ww3-project-ww3-tw/static/upload/img/' images = [img for img in os.listdir(img_files) if img.endswith(('.png', '.jpg', '.jpeg', '.gif'))] return render_template('blog/upload.html', images=images) @blog_bp.route('/message/') def messages(): return render_template('message/home.html') @blog_bp.route('/paint/') def paint(): paint_path = "/var/open-ww3-project-ww3-tw/static/upload/paint_data" paint_json = f"{paint_path}/paint.json" paint_open = json.load(open(paint_json, 'r', encoding='utf-8')) return render_template('blog/paint.html', paint_open=paint_open) @blog_bp.route('/paint/upload/', methods=['POST',]) def paint_upload(): if request.method == "POST": return "POST"