Files
qinqinghuisheng/server/app.py
2025-12-13 14:46:05 +08:00

2426 lines
73 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
KinEcho 服务端 - Flask应用主入口
支持家人端和老人端的日程同步管理
"""
from flask import Flask, request, jsonify, send_from_directory, Response
from flask_cors import CORS
from datetime import datetime, timedelta, timezone
import sqlite3
import os
import json
import time
import requests
from werkzeug.utils import secure_filename
app = Flask(__name__)
CORS(app) # 允许跨域请求
# 北京时区 (UTC+8)
BEIJING_TZ = timezone(timedelta(hours=8))
def get_beijing_time():
"""获取当前北京时间"""
return datetime.now(BEIJING_TZ)
def utc_to_beijing(utc_str):
"""将UTC时间字符串转换为北京时间字符串"""
if not utc_str:
return None
try:
# 解析UTC时间SQLite的CURRENT_TIMESTAMP格式
utc_dt = datetime.strptime(utc_str, '%Y-%m-%d %H:%M:%S')
# 添加UTC时区信息
utc_dt = utc_dt.replace(tzinfo=timezone.utc)
# 转换为北京时间
beijing_dt = utc_dt.astimezone(BEIJING_TZ)
# 返回不带时区信息的字符串
return beijing_dt.strftime('%Y-%m-%d %H:%M:%S')
except:
return utc_str
# 数据库配置
DB_PATH = os.path.join(os.path.dirname(__file__), 'kinecho.db')
# 文件上传配置
UPLOAD_FOLDER = os.path.join(os.path.dirname(__file__), 'uploads')
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'mp4', 'mov', 'avi'}
MAX_FILE_SIZE = 100 * 1024 * 1024 # 100MB
# 确保上传目录存在
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(os.path.join(UPLOAD_FOLDER, 'thumbnails'), exist_ok=True)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = MAX_FILE_SIZE
def allowed_file(filename):
"""检查文件扩展名是否允许"""
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def generate_video_thumbnail(video_path, filename):
"""使用ffmpeg生成视频缩略图"""
import subprocess
thumbnail_filename = filename.rsplit('.', 1)[0] + '_thumb.jpg'
thumbnail_path = os.path.join(UPLOAD_FOLDER, 'thumbnails', thumbnail_filename)
try:
# 使用ffmpeg截取第1秒的帧作为缩略图
cmd = [
'ffmpeg', '-i', video_path,
'-ss', '00:00:01',
'-vframes', '1',
'-vf', 'scale=320:-1',
'-y', thumbnail_path
]
subprocess.run(cmd, capture_output=True, check=True, timeout=30)
return thumbnail_path
except Exception as e:
print(f'生成视频缩略图失败: {e}')
return None
def generate_photo_thumbnail(photo_path, filename):
"""生成图片缩略图"""
from PIL import Image
thumbnail_filename = filename.rsplit('.', 1)[0] + '_thumb.jpg'
thumbnail_path = os.path.join(UPLOAD_FOLDER, 'thumbnails', thumbnail_filename)
try:
with Image.open(photo_path) as img:
# 转换为RGB处理PNG等格式
if img.mode in ('RGBA', 'P'):
img = img.convert('RGB')
# 生成缩略图,保持比例
img.thumbnail((320, 320))
img.save(thumbnail_path, 'JPEG', quality=85)
return thumbnail_path
except Exception as e:
print(f'生成图片缩略图失败: {e}')
return None
def get_db():
"""获取数据库连接"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row # 返回字典格式
return conn
def init_db():
"""初始化数据库"""
conn = get_db()
cursor = conn.cursor()
# 用户表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_type TEXT NOT NULL, -- 'family''elderly'
name TEXT NOT NULL,
phone TEXT,
family_id TEXT, -- 家庭组ID关联家人和老人
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 日程/护理计划表
cursor.execute('''
CREATE TABLE IF NOT EXISTS schedules (
id INTEGER PRIMARY KEY AUTOINCREMENT,
family_id TEXT NOT NULL, -- 家庭组ID
title TEXT NOT NULL, -- 日程标题
description TEXT, -- 详细描述
schedule_type TEXT, -- 类型medication(用药)、exercise(运动)、meal(饮食)、checkup(检查)等
schedule_time TIMESTAMP NOT NULL, -- 日程时间
repeat_type TEXT DEFAULT 'once', -- 重复类型once, daily, weekly, monthly
repeat_days TEXT, -- 重复的星期几JSON格式[1,3,5]
status TEXT DEFAULT 'pending', -- 状态pending(待执行), completed(已完成), skipped(已放弃), missed(已错过)
completed_at TIMESTAMP, -- 完成时间
auto_remind INTEGER DEFAULT 1, -- 数字人自动播报1=启用0=禁用
is_active INTEGER DEFAULT 1, -- 是否启用
created_by INTEGER, -- 创建者用户ID
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (created_by) REFERENCES users(id)
)
''')
# 提醒记录表
cursor.execute('''
CREATE TABLE IF NOT EXISTS reminders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
schedule_id INTEGER NOT NULL,
elderly_id INTEGER NOT NULL, -- 老人用户ID
remind_time TIMESTAMP NOT NULL, -- 提醒时间
status TEXT DEFAULT 'pending', -- pending, completed, missed, dismissed
completed_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (schedule_id) REFERENCES schedules(id),
FOREIGN KEY (elderly_id) REFERENCES users(id)
)
''')
# 媒体文件表
cursor.execute('''
CREATE TABLE IF NOT EXISTS media (
id INTEGER PRIMARY KEY AUTOINCREMENT,
family_id TEXT NOT NULL, -- 家庭组ID
media_type TEXT NOT NULL, -- 'photo''video'
title TEXT NOT NULL, -- 媒体标题
description TEXT, -- 描述
file_path TEXT NOT NULL, -- 文件存储路径
file_size INTEGER, -- 文件大小(字节)
duration INTEGER, -- 视频时长(秒),仅视频有值
thumbnail_path TEXT, -- 缩略图路径
uploaded_by INTEGER, -- 上传者用户ID
is_active INTEGER DEFAULT 1, -- 是否启用
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (uploaded_by) REFERENCES users(id)
)
''')
# 媒体标签表
cursor.execute('''
CREATE TABLE IF NOT EXISTS media_tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
media_id INTEGER NOT NULL,
tag TEXT NOT NULL, -- 标签内容,如 '孙女小米', '生日', '旅行'
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (media_id) REFERENCES media(id) ON DELETE CASCADE
)
''')
# 媒体触发策略表
cursor.execute('''
CREATE TABLE IF NOT EXISTS media_policies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
media_id INTEGER NOT NULL,
time_windows TEXT, -- 播放时段JSON格式["07:00-09:00", "19:00-21:00"]
moods TEXT, -- 适合心境JSON格式["happy", "sad", "calm"]
occasions TEXT, -- 特殊场合JSON格式["birthday", "anniversary"]
cooldown INTEGER DEFAULT 60, -- 冷却时间(分钟),避免重复播放
priority INTEGER DEFAULT 5, -- 优先级 1-10数字越大优先级越高
last_played_at TIMESTAMP, -- 上次播放时间
play_count INTEGER DEFAULT 0, -- 播放次数
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (media_id) REFERENCES media(id) ON DELETE CASCADE
)
''')
# 媒体播放历史表
cursor.execute('''
CREATE TABLE IF NOT EXISTS media_play_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
media_id INTEGER NOT NULL,
elderly_id INTEGER NOT NULL, -- 老人用户ID
played_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- 播放时间
duration_watched INTEGER, -- 观看时长(秒)
completed INTEGER DEFAULT 0, -- 是否看完1=是0=否
triggered_by TEXT, -- 触发方式:'auto'=自动, 'manual'=手动, 'mood'=情绪触发等
mood_before TEXT, -- 播放前情绪状态
mood_after TEXT, -- 播放后情绪状态
FOREIGN KEY (media_id) REFERENCES media(id),
FOREIGN KEY (elderly_id) REFERENCES users(id)
)
''')
# 媒体反馈表(点赞/点踩)
cursor.execute('''
CREATE TABLE IF NOT EXISTS media_feedback (
id INTEGER PRIMARY KEY AUTOINCREMENT,
media_id INTEGER NOT NULL,
elderly_id INTEGER NOT NULL, -- 老人用户ID
feedback_type TEXT NOT NULL, -- 'like''dislike'
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (media_id) REFERENCES media(id),
FOREIGN KEY (elderly_id) REFERENCES users(id),
UNIQUE(media_id, elderly_id) -- 每个老人对每个媒体只能有一个反馈
)
''')
# 家属留言表
cursor.execute('''
CREATE TABLE IF NOT EXISTS family_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
family_id TEXT NOT NULL, -- 家庭组ID
content TEXT NOT NULL, -- 留言内容
sender_name TEXT NOT NULL, -- 发送者姓名
sender_relation TEXT NOT NULL, -- 发送者称呼(儿子、女儿、孙女等)
scheduled_time TIMESTAMP NOT NULL, -- 预约播报时间
played INTEGER DEFAULT 0, -- 是否已播放0=未播放1=已播放
played_at TIMESTAMP, -- 实际播报时间
liked INTEGER DEFAULT 0, -- 老人是否点赞0=未点赞1=已点赞
is_active INTEGER DEFAULT 1, -- 是否有效
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 家属端消息/告警表(优化版)
cursor.execute('''
CREATE TABLE IF NOT EXISTS family_alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
family_id TEXT NOT NULL, -- 家庭组ID
elderly_id INTEGER, -- 老人用户ID可选用于关联具体老人
alert_type TEXT NOT NULL, -- 消息类型sos_emergency, contact_family, medication, emotion, inactive, emergency
level TEXT NOT NULL, -- 级别low, medium, high
title TEXT, -- 消息标题(简短概要)
message TEXT NOT NULL, -- 消息详细内容
metadata TEXT, -- 额外元数据JSON格式{"location": "客厅", "device": "平板"}
source TEXT DEFAULT 'elderly', -- 消息来源elderly(老人端), system(系统自动), family(家属端)
handled INTEGER DEFAULT 0, -- 是否已处理0=未处理1=已处理
handled_at TIMESTAMP, -- 处理时间
handled_by INTEGER, -- 处理人用户ID
reply_message TEXT, -- 家属回复内容
read INTEGER DEFAULT 0, -- 是否已读0=未读1=已读
read_at TIMESTAMP, -- 阅读时间
is_active INTEGER DEFAULT 1, -- 是否有效
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (elderly_id) REFERENCES users(id),
FOREIGN KEY (handled_by) REFERENCES users(id)
)
''')
# 情绪记录表
cursor.execute('''
CREATE TABLE IF NOT EXISTS mood_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
family_id TEXT NOT NULL, -- 家庭组ID
elderly_id INTEGER, -- 老人用户ID
mood_type TEXT NOT NULL, -- 情绪类型happy(开心), calm(平静), sad(难过), anxious(焦虑), angry(生气), tired(疲惫)
mood_score INTEGER DEFAULT 5, -- 情绪分数 1-10数字越大越积极
note TEXT, -- 备注说明
source TEXT DEFAULT 'manual', -- 来源manual(手动记录), ai_detect(AI检测), voice(语音分析)
trigger_event TEXT, -- 触发事件,如 '看了家人照片', '完成了散步'
location TEXT, -- 记录地点
weather TEXT, -- 天气情况
recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- 记录时间
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (elderly_id) REFERENCES users(id)
)
''')
# 情绪记录索引
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_mood_records_family_id
ON mood_records(family_id)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_mood_records_elderly_id
ON mood_records(elderly_id)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_mood_records_recorded_at
ON mood_records(recorded_at DESC)
''')
# 创建索引以提高查询性能
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_family_alerts_family_id
ON family_alerts(family_id)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_family_alerts_created_at
ON family_alerts(created_at DESC)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_family_alerts_handled
ON family_alerts(handled, created_at DESC)
''')
conn.commit()
conn.close()
# ==================== 家人端 API ====================
@app.route('/api/family/schedules', methods=['GET'])
def get_family_schedules():
"""获取家庭所有日程"""
family_id = request.args.get('family_id')
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
SELECT s.*, u.name as creator_name
FROM schedules s
LEFT JOIN users u ON s.created_by = u.id
WHERE s.family_id = ? AND s.is_active = 1
ORDER BY s.schedule_time DESC
''', (family_id,))
schedules = [dict(row) for row in cursor.fetchall()]
conn.close()
return jsonify({'schedules': schedules})
@app.route('/api/family/schedules', methods=['POST'])
def create_schedule():
"""创建新日程"""
data = request.json
required_fields = ['family_id', 'title', 'schedule_time']
if not all(field in data for field in required_fields):
return jsonify({'error': '缺少必需字段'}), 400
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO schedules (
family_id, title, description, schedule_type,
schedule_time, repeat_type, repeat_days, auto_remind, created_by
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
data['family_id'],
data['title'],
data.get('description', ''),
data.get('schedule_type', 'other'),
data['schedule_time'],
data.get('repeat_type', 'once'),
data.get('repeat_days', ''),
data.get('auto_remind', 1),
data.get('created_by')
))
schedule_id = cursor.lastrowid
conn.commit()
conn.close()
return jsonify({'success': True, 'schedule_id': schedule_id}), 201
@app.route('/api/family/schedules/<int:schedule_id>', methods=['PUT'])
def update_schedule(schedule_id):
"""更新日程"""
data = request.json
conn = get_db()
cursor = conn.cursor()
# 构建更新语句
update_fields = []
params = []
for field in ['title', 'description', 'schedule_type', 'schedule_time', 'repeat_type', 'repeat_days', 'auto_remind', 'status']:
if field in data:
update_fields.append(f"{field} = ?")
params.append(data[field])
if not update_fields:
return jsonify({'error': '没有要更新的字段'}), 400
update_fields.append("updated_at = CURRENT_TIMESTAMP")
params.append(schedule_id)
cursor.execute(f'''
UPDATE schedules
SET {', '.join(update_fields)}
WHERE id = ?
''', params)
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/api/family/schedules/<int:schedule_id>', methods=['DELETE'])
def delete_schedule(schedule_id):
"""删除日程(软删除)"""
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
UPDATE schedules
SET is_active = 0, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (schedule_id,))
conn.commit()
conn.close()
return jsonify({'success': True})
# ==================== 家属端消息/告警 API ====================
@app.route('/api/family/alerts', methods=['GET'])
def get_family_alerts():
"""获取家庭所有消息/告警"""
family_id = request.args.get('family_id')
status = request.args.get('status') # all, unhandled, handled
handled = request.args.get('handled') # true/false 布尔值
read = request.args.get('read') # true/false 布尔值
alert_type = request.args.get('alert_type') # 消息类型
elderly_id = request.args.get('elderly_id', type=int) # 老人ID
level = request.args.get('level') # low, medium, high
limit = request.args.get('limit', 100, type=int)
offset = request.args.get('offset', 0, type=int)
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
# 构建查询条件
conditions = ['a.family_id = ?', 'a.is_active = 1']
params = [family_id]
# 排除媒体展示事件(这些只用于老人端轮询,不应出现在家属端通知列表)
conditions.append("a.alert_type != 'media_display'")
# 支持status参数兼容旧版
if status == 'unhandled':
conditions.append('a.handled = 0')
elif status == 'handled':
conditions.append('a.handled = 1')
# 支持handled参数布尔值
if handled is not None:
if handled.lower() == 'true':
conditions.append('a.handled = 1')
elif handled.lower() == 'false':
conditions.append('a.handled = 0')
# 支持read参数布尔值
if read is not None:
if read.lower() == 'true':
conditions.append('a.read = 1')
elif read.lower() == 'false':
conditions.append('a.read = 0')
# 支持alert_type参数
if alert_type:
conditions.append('a.alert_type = ?')
params.append(alert_type)
# 支持elderly_id参数
if elderly_id:
conditions.append('a.elderly_id = ?')
params.append(elderly_id)
if level:
conditions.append('a.level = ?')
params.append(level)
where_clause = ' AND '.join(conditions)
# 查询总数
cursor.execute(f'''
SELECT COUNT(*) as total FROM family_alerts a
WHERE {where_clause}
''', params)
total = cursor.fetchone()['total']
# 查询数据(包含老人信息)
cursor.execute(f'''
SELECT
a.*,
u.name as elderly_name,
h.name as handler_name
FROM family_alerts a
LEFT JOIN users u ON a.elderly_id = u.id
LEFT JOIN users h ON a.handled_by = h.id
WHERE {where_clause}
ORDER BY a.created_at DESC
LIMIT ? OFFSET ?
''', params + [limit, offset])
alerts = []
for row in cursor.fetchall():
alert = dict(row)
# 转换布尔值
alert['handled'] = bool(alert['handled'])
alert['read'] = bool(alert['read'])
# 解析元数据JSON
if alert['metadata']:
try:
alert['metadata'] = json.loads(alert['metadata'])
except:
alert['metadata'] = {}
alerts.append(alert)
conn.close()
return jsonify({
'alerts': alerts,
'total': total,
'limit': limit,
'offset': offset
})
@app.route('/api/family/alerts', methods=['POST'])
def create_alert():
"""创建新消息/告警(由老人端或系统触发)"""
data = request.json
required_fields = ['family_id', 'alert_type', 'level', 'message']
if not all(field in data for field in required_fields):
return jsonify({'error': '缺少必需字段'}), 400
conn = get_db()
cursor = conn.cursor()
# 处理元数据
metadata = data.get('metadata', {})
metadata_json = json.dumps(metadata) if metadata else None
cursor.execute('''
INSERT INTO family_alerts (
family_id, elderly_id, alert_type, level, title, message,
metadata, source
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
data['family_id'],
data.get('elderly_id'),
data['alert_type'],
data['level'],
data.get('title'),
data['message'],
metadata_json,
data.get('source', 'elderly')
))
alert_id = cursor.lastrowid
conn.commit()
conn.close()
return jsonify({'success': True, 'alert_id': alert_id}), 201
@app.route('/api/family/alerts/<int:alert_id>/handle', methods=['POST'])
def handle_alert(alert_id):
"""标记消息/告警为已处理"""
data = request.json or {}
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
UPDATE family_alerts
SET handled = 1,
handled_at = CURRENT_TIMESTAMP,
handled_by = ?,
reply_message = ?,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (
data.get('handled_by'),
data.get('reply_message'),
alert_id
))
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/api/family/alerts/<int:alert_id>/read', methods=['POST'])
def mark_alert_read(alert_id):
"""标记消息为已读"""
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
UPDATE family_alerts
SET read = 1,
read_at = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (alert_id,))
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/api/family/alerts/<int:alert_id>/reply', methods=['POST'])
def reply_alert(alert_id):
"""家属回复消息"""
data = request.json
if not data or 'reply_message' not in data:
return jsonify({'error': '缺少reply_message字段'}), 400
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
UPDATE family_alerts
SET reply_message = ?,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (data['reply_message'], alert_id))
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/api/family/alerts/<int:alert_id>', methods=['DELETE'])
def delete_alert(alert_id):
"""删除消息/告警(软删除)"""
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
UPDATE family_alerts
SET is_active = 0,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (alert_id,))
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/api/family/alerts/stats', methods=['GET'])
def get_alerts_stats():
"""获取消息统计数据"""
family_id = request.args.get('family_id')
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
# 统计各级别消息数量(排除媒体展示事件)
cursor.execute('''
SELECT
level,
COUNT(*) as count
FROM family_alerts
WHERE family_id = ? AND is_active = 1 AND alert_type != 'media_display'
GROUP BY level
''', (family_id,))
level_stats = {row['level']: row['count'] for row in cursor.fetchall()}
# 统计各类型消息数量(排除媒体展示事件)
cursor.execute('''
SELECT
alert_type,
COUNT(*) as count
FROM family_alerts
WHERE family_id = ? AND is_active = 1 AND alert_type != 'media_display'
GROUP BY alert_type
''', (family_id,))
type_stats = {row['alert_type']: row['count'] for row in cursor.fetchall()}
# 统计已处理/未处理(排除媒体展示事件)
cursor.execute('''
SELECT
COUNT(CASE WHEN handled = 0 THEN 1 END) as unhandled,
COUNT(CASE WHEN handled = 1 THEN 1 END) as handled,
COUNT(CASE WHEN read = 0 THEN 1 END) as unread
FROM family_alerts
WHERE family_id = ? AND is_active = 1 AND alert_type != 'media_display'
''', (family_id,))
status_stats = dict(cursor.fetchone())
# 今日新增消息数(排除媒体展示事件)
cursor.execute('''
SELECT COUNT(*) as today_count
FROM family_alerts
WHERE family_id = ? AND is_active = 1 AND alert_type != 'media_display'
AND DATE(created_at) = DATE('now')
''', (family_id,))
today_count = cursor.fetchone()['today_count']
conn.close()
return jsonify({
'level_stats': level_stats,
'type_stats': type_stats,
'status_stats': status_stats,
'today_count': today_count
})
# ==================== 家属留言 API ====================
@app.route('/api/family/messages', methods=['GET'])
def get_family_messages():
"""获取家庭所有留言"""
family_id = request.args.get('family_id')
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM family_messages
WHERE family_id = ? AND is_active = 1
ORDER BY created_at DESC
''', (family_id,))
messages = []
for row in cursor.fetchall():
msg = dict(row)
# 转换布尔值
msg['played'] = bool(msg['played'])
msg['liked'] = bool(msg['liked'])
# 转换UTC时间为北京时间
msg['created_at'] = utc_to_beijing(msg['created_at'])
msg['updated_at'] = utc_to_beijing(msg['updated_at'])
if msg.get('played_at'):
msg['played_at'] = utc_to_beijing(msg['played_at'])
messages.append(msg)
conn.close()
return jsonify({'messages': messages})
@app.route('/api/family/messages', methods=['POST'])
def create_message():
"""创建新留言"""
data = request.json
required_fields = ['family_id', 'content', 'sender_name', 'sender_relation', 'scheduled_time']
if not all(field in data for field in required_fields):
return jsonify({'error': '缺少必需字段'}), 400
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO family_messages (
family_id, content, sender_name, sender_relation, scheduled_time
) VALUES (?, ?, ?, ?, ?)
''', (
data['family_id'],
data['content'],
data['sender_name'],
data['sender_relation'],
data['scheduled_time']
))
message_id = cursor.lastrowid
conn.commit()
conn.close()
return jsonify({'success': True, 'message_id': message_id}), 201
@app.route('/api/family/messages/<int:message_id>', methods=['DELETE'])
def delete_message(message_id):
"""删除留言(软删除)"""
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
UPDATE family_messages
SET is_active = 0, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (message_id,))
conn.commit()
conn.close()
return jsonify({'success': True})
# ==================== 老人端留言 API ====================
@app.route('/api/elderly/messages', methods=['GET'])
def get_elderly_messages():
"""获取老人端的留言列表(按预约时间排序)"""
family_id = request.args.get('family_id')
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM family_messages
WHERE family_id = ? AND is_active = 1
ORDER BY scheduled_time ASC
''', (family_id,))
messages = []
for row in cursor.fetchall():
msg = dict(row)
msg['played'] = bool(msg['played'])
msg['liked'] = bool(msg['liked'])
# 转换UTC时间为北京时间
msg['created_at'] = utc_to_beijing(msg['created_at'])
msg['updated_at'] = utc_to_beijing(msg['updated_at'])
if msg.get('played_at'):
msg['played_at'] = utc_to_beijing(msg['played_at'])
messages.append(msg)
conn.close()
return jsonify({'messages': messages})
@app.route('/api/elderly/messages/pending', methods=['GET'])
def get_pending_messages():
"""获取待播放的留言(预约时间已到但未播放的)"""
family_id = request.args.get('family_id')
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
# 调试日志 - 使用北京时间
beijing_now = get_beijing_time()
current_time = beijing_now.strftime('%Y-%m-%d %H:%M:%S')
print(f"[DEBUG] 当前北京时间: {current_time}")
cursor.execute('''
SELECT * FROM family_messages
WHERE family_id = ?
AND is_active = 1
AND played = 0
ORDER BY scheduled_time ASC
''', (family_id,))
# 获取所有未播放的消息
all_messages = []
for row in cursor.fetchall():
msg = dict(row)
print(f"[DEBUG] 留言 ID: {msg['id']}, 预约时间: {msg['scheduled_time']}")
all_messages.append(msg)
# 在 Python 中进行时间比较(更可靠) - 使用北京时间
now = beijing_now.replace(tzinfo=None) # 移除时区信息以便比较
messages = []
for msg in all_messages:
try:
# 处理各种可能的时间格式
scheduled_str = msg['scheduled_time']
# 移除 'T',统一为空格分隔
scheduled_str = scheduled_str.replace('T', ' ')
# 如果没有秒数,添加 :00
if len(scheduled_str) == 16: # YYYY-MM-DD HH:MM
scheduled_str += ':00'
scheduled_time = datetime.strptime(scheduled_str, '%Y-%m-%d %H:%M:%S')
print(f"[DEBUG] 解析后时间: {scheduled_time}, 当前北京时间: {now}, 已到期: {scheduled_time <= now}")
if scheduled_time <= now:
msg['played'] = bool(msg['played'])
msg['liked'] = bool(msg['liked'])
# 转换UTC时间为北京时间
msg['created_at'] = utc_to_beijing(msg['created_at'])
msg['updated_at'] = utc_to_beijing(msg['updated_at'])
if msg.get('played_at'):
msg['played_at'] = utc_to_beijing(msg['played_at'])
messages.append(msg)
except Exception as e:
print(f"[DEBUG] 时间解析错误: {e}, 原始值: {msg['scheduled_time']}")
continue
conn.close()
print(f"[DEBUG] 找到 {len(messages)} 条待播放留言")
return jsonify({'messages': messages})
@app.route('/api/elderly/messages/<int:message_id>/play', methods=['POST'])
def play_message(message_id):
"""标记留言为已播放"""
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
UPDATE family_messages
SET played = 1,
played_at = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (message_id,))
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/api/elderly/messages/<int:message_id>/like', methods=['POST'])
def like_message(message_id):
"""老人点赞留言"""
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
UPDATE family_messages
SET liked = 1, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (message_id,))
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/api/elderly/messages/<int:message_id>/unlike', methods=['POST'])
def unlike_message(message_id):
"""老人取消点赞"""
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
UPDATE family_messages
SET liked = 0, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (message_id,))
conn.commit()
conn.close()
return jsonify({'success': True})
# ==================== 老人端消息/告警 API ====================
@app.route('/api/elderly/alerts', methods=['POST'])
def create_elderly_alert():
"""老人端创建消息如SOS、联系家人"""
data = request.json
required_fields = ['family_id', 'alert_type', 'level', 'message']
if not all(field in data for field in required_fields):
return jsonify({'error': '缺少必需字段'}), 400
conn = get_db()
cursor = conn.cursor()
# 处理元数据
metadata = data.get('metadata', {})
metadata_json = json.dumps(metadata) if metadata else None
cursor.execute('''
INSERT INTO family_alerts (
family_id, elderly_id, alert_type, level, title, message,
metadata, source
) VALUES (?, ?, ?, ?, ?, ?, ?, 'elderly')
''', (
data['family_id'],
data.get('elderly_id'),
data['alert_type'],
data['level'],
data.get('title'),
data['message'],
metadata_json
))
alert_id = cursor.lastrowid
conn.commit()
conn.close()
return jsonify({'success': True, 'alert_id': alert_id}), 201
@app.route('/api/elderly/alerts/replies', methods=['GET'])
def get_elderly_alert_replies():
"""获取家属对老人消息的回复"""
family_id = request.args.get('family_id')
elderly_id = request.args.get('elderly_id')
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
conditions = ['family_id = ?', 'is_active = 1', 'reply_message IS NOT NULL']
params = [family_id]
if elderly_id:
conditions.append('elderly_id = ?')
params.append(elderly_id)
where_clause = ' AND '.join(conditions)
cursor.execute(f'''
SELECT
id, alert_type, level, message, reply_message,
handled_at, created_at
FROM family_alerts
WHERE {where_clause}
ORDER BY handled_at DESC
LIMIT 10
''', params)
replies = [dict(row) for row in cursor.fetchall()]
conn.close()
return jsonify({'replies': replies})
# ==================== 情绪记录 API ====================
@app.route('/api/elderly/moods', methods=['POST'])
def create_mood_record():
"""老人端创建情绪记录"""
data = request.json
required_fields = ['family_id', 'mood_type']
if not all(field in data for field in required_fields):
return jsonify({'error': '缺少必需字段'}), 400
# 验证情绪类型
valid_moods = ['happy', 'calm', 'sad', 'anxious', 'angry', 'tired']
if data['mood_type'] not in valid_moods:
return jsonify({'error': '无效的情绪类型'}), 400
# 验证情绪分数范围
mood_score = data.get('mood_score', 5)
if not (1 <= mood_score <= 10):
return jsonify({'error': '情绪分数必须在1-10之间'}), 400
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO mood_records (
family_id, elderly_id, mood_type, mood_score, note,
source, trigger_event, location, weather, recorded_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
data['family_id'],
data.get('elderly_id'),
data['mood_type'],
mood_score,
data.get('note', ''),
data.get('source', 'manual'),
data.get('trigger_event', ''),
data.get('location', ''),
data.get('weather', ''),
data.get('recorded_at', datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
))
record_id = cursor.lastrowid
conn.commit()
conn.close()
return jsonify({'success': True, 'record_id': record_id}), 201
@app.route('/api/elderly/moods', methods=['GET'])
def get_elderly_moods():
"""获取老人的情绪记录列表"""
family_id = request.args.get('family_id')
elderly_id = request.args.get('elderly_id')
limit = request.args.get('limit', 50, type=int)
offset = request.args.get('offset', 0, type=int)
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
conditions = ['family_id = ?']
params = [family_id]
if elderly_id:
conditions.append('elderly_id = ?')
params.append(elderly_id)
where_clause = ' AND '.join(conditions)
# 查询总数
cursor.execute(f'''
SELECT COUNT(*) as total FROM mood_records
WHERE {where_clause}
''', params)
total = cursor.fetchone()['total']
# 查询数据
cursor.execute(f'''
SELECT * FROM mood_records
WHERE {where_clause}
ORDER BY recorded_at DESC
LIMIT ? OFFSET ?
''', params + [limit, offset])
records = [dict(row) for row in cursor.fetchall()]
conn.close()
return jsonify({
'records': records,
'total': total,
'limit': limit,
'offset': offset
})
@app.route('/api/elderly/moods/today', methods=['GET'])
def get_today_moods():
"""获取老人今日的情绪记录"""
family_id = request.args.get('family_id')
elderly_id = request.args.get('elderly_id')
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
today = datetime.now().strftime('%Y-%m-%d')
conditions = ['family_id = ?', 'DATE(recorded_at) = DATE(?)']
params = [family_id, today]
if elderly_id:
conditions.append('elderly_id = ?')
params.append(elderly_id)
where_clause = ' AND '.join(conditions)
cursor.execute(f'''
SELECT * FROM mood_records
WHERE {where_clause}
ORDER BY recorded_at DESC
''', params)
records = [dict(row) for row in cursor.fetchall()]
conn.close()
return jsonify({'records': records})
@app.route('/api/elderly/moods/latest', methods=['GET'])
def get_latest_mood():
"""获取老人最新的情绪记录"""
family_id = request.args.get('family_id')
elderly_id = request.args.get('elderly_id')
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
conditions = ['family_id = ?']
params = [family_id]
if elderly_id:
conditions.append('elderly_id = ?')
params.append(elderly_id)
where_clause = ' AND '.join(conditions)
cursor.execute(f'''
SELECT * FROM mood_records
WHERE {where_clause}
ORDER BY recorded_at DESC
LIMIT 1
''', params)
row = cursor.fetchone()
conn.close()
if row:
return jsonify({'record': dict(row)})
else:
return jsonify({'record': None})
# ==================== 家属端情绪记录 API ====================
@app.route('/api/family/moods', methods=['GET'])
def get_family_moods():
"""家属端获取老人的情绪记录"""
family_id = request.args.get('family_id')
elderly_id = request.args.get('elderly_id')
mood_type = request.args.get('mood_type')
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
limit = request.args.get('limit', 100, type=int)
offset = request.args.get('offset', 0, type=int)
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
conditions = ['m.family_id = ?']
params = [family_id]
if elderly_id:
conditions.append('m.elderly_id = ?')
params.append(elderly_id)
if mood_type:
conditions.append('m.mood_type = ?')
params.append(mood_type)
if start_date:
conditions.append('DATE(m.recorded_at) >= DATE(?)')
params.append(start_date)
if end_date:
conditions.append('DATE(m.recorded_at) <= DATE(?)')
params.append(end_date)
where_clause = ' AND '.join(conditions)
# 查询总数
cursor.execute(f'''
SELECT COUNT(*) as total FROM mood_records m
WHERE {where_clause}
''', params)
total = cursor.fetchone()['total']
# 查询数据(包含老人信息)
cursor.execute(f'''
SELECT
m.*,
u.name as elderly_name
FROM mood_records m
LEFT JOIN users u ON m.elderly_id = u.id
WHERE {where_clause}
ORDER BY m.recorded_at DESC
LIMIT ? OFFSET ?
''', params + [limit, offset])
records = [dict(row) for row in cursor.fetchall()]
conn.close()
return jsonify({
'records': records,
'total': total,
'limit': limit,
'offset': offset
})
@app.route('/api/family/moods/stats', methods=['GET'])
def get_mood_stats():
"""获取情绪统计数据"""
family_id = request.args.get('family_id')
elderly_id = request.args.get('elderly_id')
days = request.args.get('days', 7, type=int) # 统计最近N天
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
conditions = ['family_id = ?']
params = [family_id]
if elderly_id:
conditions.append('elderly_id = ?')
params.append(elderly_id)
# 添加时间范围条件
conditions.append(f"DATE(recorded_at) >= DATE('now', '-{days} days')")
where_clause = ' AND '.join(conditions)
# 按情绪类型统计
cursor.execute(f'''
SELECT
mood_type,
COUNT(*) as count,
AVG(mood_score) as avg_score
FROM mood_records
WHERE {where_clause}
GROUP BY mood_type
ORDER BY count DESC
''', params)
mood_type_stats = []
for row in cursor.fetchall():
stat = dict(row)
stat['avg_score'] = round(stat['avg_score'], 1) if stat['avg_score'] else 0
mood_type_stats.append(stat)
# 按日期统计平均分数
cursor.execute(f'''
SELECT
DATE(recorded_at) as date,
AVG(mood_score) as avg_score,
COUNT(*) as count
FROM mood_records
WHERE {where_clause}
GROUP BY DATE(recorded_at)
ORDER BY date DESC
''', params)
daily_stats = []
for row in cursor.fetchall():
stat = dict(row)
stat['avg_score'] = round(stat['avg_score'], 1) if stat['avg_score'] else 0
daily_stats.append(stat)
# 计算整体统计
cursor.execute(f'''
SELECT
COUNT(*) as total_records,
AVG(mood_score) as avg_score,
MAX(mood_score) as max_score,
MIN(mood_score) as min_score
FROM mood_records
WHERE {where_clause}
''', params)
overall = dict(cursor.fetchone())
overall['avg_score'] = round(overall['avg_score'], 1) if overall['avg_score'] else 0
# 今日记录数
cursor.execute(f'''
SELECT COUNT(*) as today_count
FROM mood_records
WHERE {where_clause.replace(f"DATE(recorded_at) >= DATE('now', '-{days} days')", "DATE(recorded_at) = DATE('now')")}
''', params)
today_count = cursor.fetchone()['today_count']
conn.close()
return jsonify({
'mood_type_stats': mood_type_stats,
'daily_stats': daily_stats,
'overall': overall,
'today_count': today_count,
'days': days
})
@app.route('/api/family/moods/trend', methods=['GET'])
def get_mood_trend():
"""获取情绪趋势数据"""
family_id = request.args.get('family_id')
elderly_id = request.args.get('elderly_id')
days = request.args.get('days', 30, type=int)
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
conditions = ['family_id = ?']
params = [family_id]
if elderly_id:
conditions.append('elderly_id = ?')
params.append(elderly_id)
conditions.append(f"DATE(recorded_at) >= DATE('now', '-{days} days')")
where_clause = ' AND '.join(conditions)
# 按日期获取情绪趋势
cursor.execute(f'''
SELECT
DATE(recorded_at) as date,
mood_type,
AVG(mood_score) as avg_score,
COUNT(*) as count
FROM mood_records
WHERE {where_clause}
GROUP BY DATE(recorded_at), mood_type
ORDER BY date ASC, count DESC
''', params)
trend_data = []
for row in cursor.fetchall():
item = dict(row)
item['avg_score'] = round(item['avg_score'], 1) if item['avg_score'] else 0
trend_data.append(item)
conn.close()
return jsonify({
'trend': trend_data,
'days': days
})
# ==================== 老人端 API ====================
@app.route('/api/elderly/schedules/today', methods=['GET'])
def get_today_schedules():
"""获取老人今日日程"""
family_id = request.args.get('family_id')
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
# 获取今天的日期范围
today = datetime.now().strftime('%Y-%m-%d')
cursor.execute('''
SELECT * FROM schedules
WHERE family_id = ?
AND is_active = 1
AND (
(repeat_type = 'once' AND DATE(schedule_time) = DATE(?))
OR repeat_type = 'daily'
OR (repeat_type = 'weekly' AND CAST(strftime('%w', ?) AS INTEGER) IN (
SELECT value FROM json_each(repeat_days)
))
)
ORDER BY TIME(schedule_time)
''', (family_id, today, today))
schedules = [dict(row) for row in cursor.fetchall()]
conn.close()
return jsonify({'schedules': schedules})
@app.route('/api/elderly/schedules/upcoming', methods=['GET'])
def get_upcoming_schedules():
"""获取即将到来的日程(下一小时内)"""
family_id = request.args.get('family_id')
elderly_id = request.args.get('elderly_id')
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM schedules
WHERE family_id = ?
AND is_active = 1
AND datetime(schedule_time) BETWEEN datetime('now') AND datetime('now', '+1 hour')
ORDER BY schedule_time
''', (family_id,))
schedules = [dict(row) for row in cursor.fetchall()]
conn.close()
return jsonify({'schedules': schedules})
@app.route('/api/elderly/reminders/<int:reminder_id>/complete', methods=['POST'])
def complete_reminder(reminder_id):
"""标记提醒为已完成"""
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
UPDATE reminders
SET status = 'completed', completed_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (reminder_id,))
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/api/elderly/reminders/<int:reminder_id>/dismiss', methods=['POST'])
def dismiss_reminder(reminder_id):
"""忽略提醒"""
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
UPDATE reminders
SET status = 'dismissed'
WHERE id = ?
''', (reminder_id,))
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/api/elderly/schedules/<int:schedule_id>/status', methods=['POST'])
def update_schedule_status(schedule_id):
"""更新日程状态"""
data = request.json
status = data.get('status') # pending, completed, skipped, missed
if status not in ['pending', 'completed', 'skipped', 'missed']:
return jsonify({'error': '无效的状态值'}), 400
conn = get_db()
cursor = conn.cursor()
# 如果状态是 completed记录完成时间
if status == 'completed':
cursor.execute('''
UPDATE schedules
SET status = ?, completed_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (status, schedule_id))
else:
cursor.execute('''
UPDATE schedules
SET status = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (status, schedule_id))
conn.commit()
conn.close()
return jsonify({'success': True})
# ==================== 用户管理 API ====================
@app.route('/api/users', methods=['POST'])
def create_user():
"""创建用户"""
data = request.json
required_fields = ['user_type', 'name', 'family_id']
if not all(field in data for field in required_fields):
return jsonify({'error': '缺少必需字段'}), 400
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO users (user_type, name, phone, family_id)
VALUES (?, ?, ?, ?)
''', (
data['user_type'],
data['name'],
data.get('phone', ''),
data['family_id']
))
user_id = cursor.lastrowid
conn.commit()
conn.close()
return jsonify({'success': True, 'user_id': user_id}), 201
@app.route('/api/users/<string:family_id>', methods=['GET'])
def get_family_users(family_id):
"""获取家庭成员列表"""
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM users
WHERE family_id = ?
ORDER BY user_type, created_at
''', (family_id,))
users = [dict(row) for row in cursor.fetchall()]
conn.close()
return jsonify({'users': users})
# ==================== 媒体库 API ====================
@app.route('/api/family/media', methods=['POST'])
def upload_media():
"""家属端上传媒体文件"""
# 检查是否有文件
if 'file' not in request.files:
return jsonify({'error': '没有上传文件'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': '文件名为空'}), 400
if not allowed_file(file.filename):
return jsonify({'error': '不支持的文件类型'}), 400
# 获取其他表单数据
family_id = request.form.get('family_id')
title = request.form.get('title')
description = request.form.get('description', '')
uploaded_by = request.form.get('uploaded_by')
if not family_id or not title:
return jsonify({'error': '缺少必需字段'}), 400
# 保存文件
filename = secure_filename(file.filename)
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
unique_filename = f"{timestamp}_{filename}"
file_path = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename)
file.save(file_path)
# 判断媒体类型
ext = filename.rsplit('.', 1)[1].lower()
media_type = 'video' if ext in {'mp4', 'mov', 'avi'} else 'photo'
# 获取文件大小
file_size = os.path.getsize(file_path)
# 生成缩略图
thumbnail_path = None
if media_type == 'video':
thumbnail_path = generate_video_thumbnail(file_path, unique_filename)
elif media_type == 'photo':
thumbnail_path = generate_photo_thumbnail(file_path, unique_filename)
# 插入数据库
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO media (
family_id, media_type, title, description,
file_path, file_size, thumbnail_path, uploaded_by
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (family_id, media_type, title, description, file_path, file_size, thumbnail_path, uploaded_by))
media_id = cursor.lastrowid
# 创建默认触发策略
cursor.execute('''
INSERT INTO media_policies (media_id, time_windows, moods, occasions, cooldown, priority)
VALUES (?, ?, ?, ?, ?, ?)
''', (media_id, '[]', '[]', '[]', 60, 5))
conn.commit()
conn.close()
return jsonify({
'success': True,
'media_id': media_id,
'file_path': file_path,
'media_type': media_type
}), 201
@app.route('/api/family/media', methods=['GET'])
def get_family_media():
"""获取家庭所有媒体列表"""
family_id = request.args.get('family_id')
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
# 获取媒体列表及其标签和策略
cursor.execute('''
SELECT
m.*,
p.time_windows,
p.moods,
p.occasions,
p.cooldown,
p.priority,
p.play_count,
p.last_played_at,
GROUP_CONCAT(t.tag) as tags
FROM media m
LEFT JOIN media_policies p ON m.id = p.media_id
LEFT JOIN media_tags t ON m.id = t.media_id
WHERE m.family_id = ? AND m.is_active = 1
GROUP BY m.id
ORDER BY m.created_at DESC
''', (family_id,))
media_list = []
for row in cursor.fetchall():
media_dict = dict(row)
# 解析标签
if media_dict['tags']:
media_dict['tags'] = media_dict['tags'].split(',')
else:
media_dict['tags'] = []
# 解析JSON字段
for field in ['time_windows', 'moods', 'occasions']:
try:
media_dict[field] = json.loads(media_dict[field]) if media_dict[field] else []
except:
media_dict[field] = []
media_list.append(media_dict)
conn.close()
return jsonify({'media': media_list})
@app.route('/api/family/media/<int:media_id>', methods=['GET'])
def get_media_detail(media_id):
"""获取媒体详情"""
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
SELECT
m.*,
p.time_windows,
p.moods,
p.occasions,
p.cooldown,
p.priority,
p.play_count,
p.last_played_at
FROM media m
LEFT JOIN media_policies p ON m.id = p.media_id
WHERE m.id = ?
''', (media_id,))
row = cursor.fetchone()
if not row:
conn.close()
return jsonify({'error': '媒体不存在'}), 404
media_dict = dict(row)
# 获取标签
cursor.execute('SELECT tag FROM media_tags WHERE media_id = ?', (media_id,))
media_dict['tags'] = [row['tag'] for row in cursor.fetchall()]
# 解析JSON字段
for field in ['time_windows', 'moods', 'occasions']:
try:
media_dict[field] = json.loads(media_dict[field]) if media_dict[field] else []
except:
media_dict[field] = []
# 获取播放统计
cursor.execute('''
SELECT
COUNT(*) as total_plays,
SUM(CASE WHEN feedback_type = 'like' THEN 1 ELSE 0 END) as likes,
SUM(CASE WHEN feedback_type = 'dislike' THEN 1 ELSE 0 END) as dislikes
FROM media_play_history mph
LEFT JOIN media_feedback mf ON mph.media_id = mf.media_id AND mph.elderly_id = mf.elderly_id
WHERE mph.media_id = ?
''', (media_id,))
stats = dict(cursor.fetchone())
media_dict['statistics'] = stats
conn.close()
return jsonify(media_dict)
@app.route('/api/family/media/<int:media_id>', methods=['PUT'])
def update_media(media_id):
"""更新媒体信息和触发策略"""
data = request.json
conn = get_db()
cursor = conn.cursor()
# 更新媒体基本信息
if 'title' in data or 'description' in data:
update_fields = []
params = []
if 'title' in data:
update_fields.append('title = ?')
params.append(data['title'])
if 'description' in data:
update_fields.append('description = ?')
params.append(data['description'])
update_fields.append('updated_at = CURRENT_TIMESTAMP')
params.append(media_id)
cursor.execute(f'''
UPDATE media
SET {', '.join(update_fields)}
WHERE id = ?
''', params)
# 更新标签
if 'tags' in data:
# 删除旧标签
cursor.execute('DELETE FROM media_tags WHERE media_id = ?', (media_id,))
# 添加新标签
for tag in data['tags']:
cursor.execute('''
INSERT INTO media_tags (media_id, tag)
VALUES (?, ?)
''', (media_id, tag))
# 更新触发策略
policy_fields = ['time_windows', 'moods', 'occasions', 'cooldown', 'priority']
policy_updates = []
policy_params = []
for field in policy_fields:
if field in data:
policy_updates.append(f'{field} = ?')
# JSON字段需要序列化
if field in ['time_windows', 'moods', 'occasions']:
policy_params.append(json.dumps(data[field]))
else:
policy_params.append(data[field])
if policy_updates:
policy_updates.append('updated_at = CURRENT_TIMESTAMP')
policy_params.append(media_id)
cursor.execute(f'''
UPDATE media_policies
SET {', '.join(policy_updates)}
WHERE media_id = ?
''', policy_params)
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/api/family/media/<int:media_id>', methods=['DELETE'])
def delete_media(media_id):
"""删除媒体(软删除)"""
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
UPDATE media
SET is_active = 0, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (media_id,))
conn.commit()
conn.close()
return jsonify({'success': True})
# ==================== 老人端媒体 API ====================
@app.route('/api/elderly/media/recommended', methods=['GET'])
def get_recommended_media():
"""获取推荐媒体(基于时段、心境、场合等策略)"""
family_id = request.args.get('family_id')
elderly_id = request.args.get('elderly_id')
current_mood = request.args.get('mood', '') # 当前心境
occasion = request.args.get('occasion', '') # 特殊场合
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
# 获取当前时间
now = datetime.now()
current_time = now.strftime('%H:%M')
# 查询符合条件的媒体
cursor.execute('''
SELECT
m.*,
p.time_windows,
p.moods,
p.occasions,
p.cooldown,
p.priority,
p.play_count,
p.last_played_at,
GROUP_CONCAT(t.tag) as tags
FROM media m
INNER JOIN media_policies p ON m.id = p.media_id
LEFT JOIN media_tags t ON m.id = t.media_id
WHERE m.family_id = ? AND m.is_active = 1
GROUP BY m.id
ORDER BY p.priority DESC, p.play_count ASC
''', (family_id,))
recommended = []
for row in cursor.fetchall():
media_dict = dict(row)
# 解析JSON字段
time_windows = json.loads(media_dict['time_windows']) if media_dict['time_windows'] else []
moods = json.loads(media_dict['moods']) if media_dict['moods'] else []
occasions = json.loads(media_dict['occasions']) if media_dict['occasions'] else []
# 检查冷却时间
if media_dict['last_played_at']:
last_played = datetime.fromisoformat(media_dict['last_played_at'])
cooldown_minutes = media_dict['cooldown']
if now - last_played < timedelta(minutes=cooldown_minutes):
continue # 还在冷却期,跳过
# 检查时段匹配
time_match = not time_windows # 如果没有设置时段,默认匹配
for window in time_windows:
if '-' in window:
start, end = window.split('-')
if start <= current_time <= end:
time_match = True
break
if not time_match:
continue
# 检查心境匹配
mood_match = not moods or not current_mood or current_mood in moods
if not mood_match:
continue
# 检查场合匹配
occasion_match = not occasions or not occasion or occasion in occasions
if not occasion_match:
continue
# 解析标签
if media_dict['tags']:
media_dict['tags'] = media_dict['tags'].split(',')
else:
media_dict['tags'] = []
recommended.append(media_dict)
conn.close()
return jsonify({'media': recommended})
@app.route('/api/elderly/media/<int:media_id>/play', methods=['POST'])
def record_media_play(media_id):
"""记录媒体播放"""
data = request.json
elderly_id = data.get('elderly_id')
duration_watched = data.get('duration_watched', 0)
completed = data.get('completed', 0)
triggered_by = data.get('triggered_by', 'manual')
mood_before = data.get('mood_before', '')
mood_after = data.get('mood_after', '')
if not elderly_id:
return jsonify({'error': '缺少elderly_id'}), 400
conn = get_db()
cursor = conn.cursor()
# 记录播放历史
cursor.execute('''
INSERT INTO media_play_history (
media_id, elderly_id, duration_watched, completed,
triggered_by, mood_before, mood_after
) VALUES (?, ?, ?, ?, ?, ?, ?)
''', (media_id, elderly_id, duration_watched, completed,
triggered_by, mood_before, mood_after))
# 更新媒体策略的播放次数和最后播放时间
cursor.execute('''
UPDATE media_policies
SET play_count = play_count + 1,
last_played_at = CURRENT_TIMESTAMP
WHERE media_id = ?
''', (media_id,))
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/api/elderly/media/<int:media_id>/feedback', methods=['POST'])
def submit_media_feedback(media_id):
"""提交媒体反馈(点赞/点踩)"""
data = request.json
elderly_id = data.get('elderly_id')
feedback_type = data.get('feedback_type') # 'like' 或 'dislike'
if not elderly_id or feedback_type not in ['like', 'dislike']:
return jsonify({'error': '参数错误'}), 400
conn = get_db()
cursor = conn.cursor()
# 使用 INSERT OR REPLACE 来处理重复反馈
cursor.execute('''
INSERT OR REPLACE INTO media_feedback (media_id, elderly_id, feedback_type)
VALUES (?, ?, ?)
''', (media_id, elderly_id, feedback_type))
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/api/elderly/media/history', methods=['GET'])
def get_media_history():
"""获取媒体播放历史"""
elderly_id = request.args.get('elderly_id')
limit = request.args.get('limit', 50)
if not elderly_id:
return jsonify({'error': '缺少elderly_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
SELECT
mph.*,
m.title,
m.media_type,
m.file_path,
mf.feedback_type
FROM media_play_history mph
INNER JOIN media m ON mph.media_id = m.id
LEFT JOIN media_feedback mf ON mph.media_id = mf.media_id AND mph.elderly_id = mf.elderly_id
WHERE mph.elderly_id = ?
ORDER BY mph.played_at DESC
LIMIT ?
''', (elderly_id, limit))
history = [dict(row) for row in cursor.fetchall()]
conn.close()
return jsonify({'history': history})
@app.route('/api/family/media/recent-plays', methods=['GET'])
def get_recent_plays():
"""获取最近播放的媒体(家属端查看)"""
family_id = request.args.get('family_id')
limit = request.args.get('limit', 10)
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
SELECT
m.id,
m.title,
m.media_type,
m.thumbnail_path,
mph.played_at,
COUNT(CASE WHEN mf.feedback_type = 'like' THEN 1 END) as likes,
COUNT(CASE WHEN mf.feedback_type = 'dislike' THEN 1 END) as dislikes
FROM media m
INNER JOIN media_play_history mph ON m.id = mph.media_id
LEFT JOIN media_feedback mf ON m.id = mf.media_id
WHERE m.family_id = ?
GROUP BY m.id, mph.played_at
ORDER BY mph.played_at DESC
LIMIT ?
''', (family_id, limit))
recent = [dict(row) for row in cursor.fetchall()]
conn.close()
return jsonify({'recent_plays': recent})
# ==================== 静态文件服务 ====================
@app.route('/uploads/<path:filename>')
def serve_upload(filename):
"""提供上传文件的访问"""
return send_from_directory(app.config['UPLOAD_FOLDER'], filename)
# ==================== 数字人媒体展示 API ====================
@app.route('/api/elderly/show-media', methods=['POST'])
def show_media_on_avatar():
"""
控制老人端在数字人主页中部弹出透明窗口展示媒体文件
参数:
- media_title: 媒体标题(用于查找媒体文件)
- avatar_text: 数字人播报内容
- duration: 展示时长(秒),默认30秒
"""
data = request.json
required_fields = ['media_title', 'avatar_text']
if not all(field in data for field in required_fields):
return jsonify({'error': '缺少必需字段: media_title 和 avatar_text'}), 400
media_title = data['media_title']
avatar_text = data['avatar_text']
duration = data.get('duration', 30) # 默认30秒
# 从数据库查找媒体文件
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
SELECT id, media_type, file_path, title
FROM media
WHERE title = ? AND is_active = 1
LIMIT 1
''', (media_title,))
media_row = cursor.fetchone()
if not media_row:
conn.close()
return jsonify({'error': f'未找到标题为 "{media_title}" 的媒体文件'}), 404
media_dict = dict(media_row)
media_type = media_dict['media_type']
file_path = media_dict['file_path']
# 提取文件名(不含路径)
media_filename = os.path.basename(file_path)
try:
# 1. 推送播报内容到数字人5000端口
avatar_response = requests.post(
'http://127.0.0.1:5000/transparent-pass',
json={
'user': 'User',
'text': avatar_text
},
timeout=5
)
if not avatar_response.ok:
print(f'推送数字人播报失败: {avatar_response.status_code}')
# 2. 通知老人端弹出媒体展示窗口
# 创建媒体展示事件(使用 family_alerts 表的特殊类型)
cursor.execute('''
INSERT INTO family_alerts (
family_id, alert_type, level, title, message, metadata, source
) VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
data.get('family_id', 'family_001'),
'media_display', # 特殊类型:媒体展示
'low',
media_title, # 使用媒体标题作为标题
avatar_text,
json.dumps({
'media_filename': media_filename,
'media_type': media_type,
'media_title': media_title,
'avatar_text': avatar_text,
'duration': duration,
'event_type': 'show_media'
}),
'system'
))
event_id = cursor.lastrowid
conn.commit()
conn.close()
return jsonify({
'success': True,
'event_id': event_id,
'message': '媒体展示请求已发送'
}), 201
except Exception as e:
print(f'处理媒体展示请求失败: {e}')
return jsonify({'error': str(e)}), 500
@app.route('/api/elderly/hide-media', methods=['POST'])
def hide_media_on_avatar():
"""
控制老人端关闭当前显示的媒体窗口
参数:
- family_id: 家庭ID可选默认family_001
"""
data = request.json or {}
family_id = data.get('family_id', 'family_001')
conn = get_db()
cursor = conn.cursor()
try:
# 创建隐藏媒体事件
cursor.execute('''
INSERT INTO family_alerts (
family_id, alert_type, level, title, message, metadata, source
) VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
family_id,
'media_display', # 使用相同类型
'low',
'关闭媒体显示',
'关闭当前显示的媒体',
json.dumps({
'event_type': 'hide_media'
}),
'system'
))
event_id = cursor.lastrowid
conn.commit()
conn.close()
return jsonify({
'success': True,
'event_id': event_id,
'message': '关闭媒体请求已发送'
}), 201
except Exception as e:
print(f'处理关闭媒体请求失败: {e}')
conn.close()
return jsonify({'error': str(e)}), 500
@app.route('/api/elderly/poll-media-events', methods=['GET'])
def poll_media_events():
"""
老人端轮询媒体展示事件
"""
family_id = request.args.get('family_id', 'family_001')
conn = get_db()
cursor = conn.cursor()
# 查询未读的媒体展示事件
cursor.execute('''
SELECT * FROM family_alerts
WHERE family_id = ?
AND alert_type = 'media_display'
AND read = 0
AND is_active = 1
ORDER BY created_at DESC
LIMIT 1
''', (family_id,))
row = cursor.fetchone()
if row:
alert = dict(row)
# 解析元数据
if alert['metadata']:
try:
alert['metadata'] = json.loads(alert['metadata'])
except:
alert['metadata'] = {}
# 标记为已读
cursor.execute('''
UPDATE family_alerts
SET read = 1, read_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (alert['id'],))
conn.commit()
conn.close()
return jsonify({'event': alert})
else:
conn.close()
return jsonify({'event': None})
# ==================== 老人端Toast通知 ====================
# 全局变量存储待显示的toast内存中重启会丢失
pending_toasts = {} # key: family_id, value: list of toast objects
# SSE连接管理
sse_clients = {} # key: family_id, value: list of response queues
@app.route('/api/elderly/toast', methods=['POST'])
def create_toast():
"""创建老人端Toast通知供MCP工具调用"""
data = request.json
family_id = data.get('family_id')
toast_type = data.get('type', 'info') # success, info, calling
message = data.get('message')
duration = data.get('duration', 3000) # 默认3秒
if not family_id or not message:
return jsonify({'error': '缺少必需参数'}), 400
# 创建toast对象
toast = {
'id': int(time.time() * 1000), # 使用时间戳作为ID
'type': toast_type,
'message': message,
'duration': duration,
'created_at': datetime.now().isoformat()
}
# 添加到待显示列表(备用轮询方式)
if family_id not in pending_toasts:
pending_toasts[family_id] = []
pending_toasts[family_id].append(toast)
# 通过SSE推送给连接的客户端
if family_id in sse_clients:
for client_queue in sse_clients[family_id]:
try:
client_queue.put(toast)
except:
pass # 客户端可能已断开
return jsonify({'success': True, 'toast_id': toast['id']}), 201
@app.route('/api/elderly/toast/poll', methods=['GET'])
def poll_toast():
"""老人端轮询获取待显示的Toast备用方案"""
family_id = request.args.get('family_id')
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
# 获取并清空该family的待显示toast
toasts = pending_toasts.get(family_id, [])
if toasts:
# 返回最新的toast并从列表中移除
toast = toasts.pop(0)
return jsonify({'toast': toast})
return jsonify({'toast': None})
@app.route('/api/elderly/toast/stream', methods=['GET'])
def toast_stream():
"""SSE端点实时推送Toast通知"""
family_id = request.args.get('family_id')
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
def generate():
import queue
# 为此客户端创建队列
client_queue = queue.Queue()
# 注册客户端
if family_id not in sse_clients:
sse_clients[family_id] = []
sse_clients[family_id].append(client_queue)
try:
# 发送连接成功消息
yield f"data: {json.dumps({'type': 'connected'})}\n\n"
# 持续监听队列
while True:
try:
# 等待新的toast30秒超时发送心跳
toast = client_queue.get(timeout=30)
yield f"data: {json.dumps(toast)}\n\n"
except queue.Empty:
# 发送心跳保持连接
yield f": heartbeat\n\n"
finally:
# 客户端断开时清理
if family_id in sse_clients:
sse_clients[family_id].remove(client_queue)
if not sse_clients[family_id]:
del sse_clients[family_id]
return Response(
generate(),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no',
'Connection': 'keep-alive'
}
)
# ==================== 健康检查 ====================
@app.route('/api/health', methods=['GET'])
def health_check():
"""健康检查端点"""
return jsonify({'status': 'ok', 'timestamp': datetime.now().isoformat()})
if __name__ == '__main__':
# 初始化数据库
init_db()
print("数据库初始化完成")
# 启动应用
app.run(host='0.0.0.0', port=8000, debug=True)