Files
open-ww3-project-ww3-tw/blueprint/blog_views.py
2026-02-08 10:54:32 +08:00

184 lines
6.3 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from flask import Blueprint, render_template, request, url_for, flash, redirect, send_from_directory, make_response, current_app
from database import get_owp_db
from flask_autoindex import AutoIndex
from werkzeug.utils import secure_filename
from datetime import datetime
import os
import sqlite3
import time
import json
import shutil
def get_owp_db_conn():
conn = sqlite3.connect('/var/open-ww3-project-ww3-tw/databases/sqlite/owp.db')
conn.row_factory = sqlite3.Row
return conn
def get_message_db_conn():
conn = sqlite3.connect('/var/open-ww3-project-ww3-tw/databases/sqlite/message.db')
conn.row_factory = sqlite3.Row
return conn
blog_bp = Blueprint('blog', __name__)
index_path = '/var/open-ww3-project-ww3-tw/mirrors/'
index = AutoIndex(blog_bp, browse_root=index_path, add_url_rules=False)
@blog_bp.route('/')
def home():
db = get_owp_db()
conn = get_owp_db_conn()
sql_logs = "SELECT * from logs;"
sql_posts = "SELECT * from posts where status= 1;"
logs = conn.execute(sql_logs).fetchall()
posts = conn.execute(sql_posts).fetchall()
conn.close()
count_file = '/var/open-ww3-project-ww3-tw/static/counter.txt'
def load_couter():
if os.path.exists(count_file):
with open(count_file, 'r') as f:
return int(f.read().strip())
return 0
def save_couter(couter):
with open(count_file, 'w') as f:
f.write(str(couter))
counter = load_couter()
counter += 1
save_couter(counter)
return render_template('blog/home.html', logs=logs[::-1], posts=posts[::-1], counter=counter)
@blog_bp.route('/about/')
def about():
return render_template('blog/about.html')
pass
@blog_bp.route('/posts/')
def posts_list():
conn = get_owp_db_conn()
sql_posts = "SELECT * from posts where status= 1;"
posts = conn.execute(sql_posts).fetchall()
conn.close()
return render_template('blog/list.html', posts=posts[::-1])
@blog_bp.route('/posts/<int:posts_id>/')
def show_posts_id(posts_id):
message_path = f"/var/open-ww3-project-ww3-tw/static/message/{posts_id}"
if not os.path.exists(message_path):
os.makedirs(message_path)
message_json = f"{message_path}/chat.json"
if not os.path.exists(message_json):
with open(message_json, 'w', encoding='utf-8') as file:
json.dump([], file, ensure_ascii=False)
message_ss = json.load(open(message_json, 'r', encoding='utf-8'))
conn = get_owp_db_conn()
conn_message = get_message_db_conn()
sql_posts = "SELECT * FROM posts WHERE status = 1 AND id = ?"
posts = conn.execute(sql_posts, (posts_id,)).fetchone()
sql_message = "SELECT * FROM messages WHERE posts_id = ?"
message = conn_message.execute(sql_message, (posts_id,)).fetchall()
conn.close()
conn_message.close()
if posts is None:
shutil.rmtree(message_path)
return f'未找到该文章{posts_id}<title>未找到该文章{posts_id}/</title>', 404
return render_template('blog/posts.html', posts=posts, message=message[::-1], message_ss=message_ss[::-1])
# posts 留言
@blog_bp.route('/posts/<int:posts_id>/chat/', methods=['POST', 'GET'])
def posts_chat(posts_id):
message_path = f"/var/open-ww3-project-ww3-tw/static/message/{posts_id}/chat.json"
if request.method == "POST":
name = request.form.get("name", None)
data = request.form.get("data", None)
date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
json_push = {
"name": name,
"data": data,
"date": date
}
with open(message_path, 'r', encoding='utf-8') as f:
old_date_data = json.load(f)
old_date_data.append(json_push)
with open(message_path, 'w', encoding='utf-8') as f:
json.dump(old_date_data, f, ensure_ascii=False, indent=4)
return "留言成功<br><a href='../'>返回文章</a>"
return "GET"
# 数据库测试
@blog_bp.route('/db_test/')
def test_db():
db = get_owp_db()
try:
cur = db.execute('SELECT sqlite_version();')
ver = cur.fetchone()[0]
return f"数据库连接成功SQLite 版本是: {ver}"
except Exception as e:
return f"连接失败: {str(e)}"
######
## 网站地图 及其 robots 配置
@blog_bp.route('/robots.txt/')
def robots():
return send_from_directory(current_app.static_folder, 'robots.txt')
@blog_bp.route('/google02f6a3f6004a32c6.html')
def google02f6a3f6004a32c6():
return send_from_directory(blog_bp.static_folder, 'google02f6a3f6004a32c6.html')
@blog_bp.route('/sitemap.xml/')
def sitemap():
conn = get_owp_db_conn()
sql_posts = "SELECT * from posts where status = 1;"
posts = conn.execute(sql_posts).fetchall()
conn.close()
template = render_template('blog/sitemap.xml', posts=posts[::-1], base_url="https://open-ww3-project.ww3.tw/blog/")
response = make_response(template)
response.headers['Content-Type'] = 'application/xml'
return response
@blog_bp.route('/kami/')
def kami():
return redirect(url_for('kami.home'))
@blog_bp.route('/mirrors/')
@blog_bp.route('/mirrors/<path:path>')
def autoindex(path='.'):
return index.render_autoindex(path, template='blog/mirrors.html')
@blog_bp.route('/upload/', methods=['POST' , 'GET'])
def upload():
if request.method == "POST":
f = request.files.get('img')
filename = f.filename
with open(f'/var/open-ww3-project-ww3-tw/static/upload/img/{filename}', 'wb') as tf:
tf.write(f.read())
return '上传成功<meta http-equiv="refresh" content="1.2;url=https://ww3.tw/blog/upload/">'
img_files = '/var/open-ww3-project-ww3-tw/static/upload/img/'
images = [img for img in os.listdir(img_files) if img.endswith(('.png', '.jpg', '.jpeg', '.gif'))]
return render_template('blog/upload.html', images=images)
@blog_bp.route('/message/')
def messages():
return render_template('message/home.html')
@blog_bp.route('/paint/')
def paint():
paint_path = "/var/open-ww3-project-ww3-tw/static/upload/paint_data"
paint_json = f"{paint_path}/paint.json"
paint_open = json.load(open(paint_json, 'r', encoding='utf-8'))
return render_template('blog/paint.html', paint_open=paint_open)
@blog_bp.route('/paint/upload/', methods=['POST',])
def paint_upload():
if request.method == "POST":
return "POST"