Files
qinqinghuisheng/server/app.py

2426 lines
73 KiB
Python
Raw Normal View History

2025-12-13 14:46:05 +08:00
"""
KinEcho 服务端 - Flask应用主入口
支持家人端和老人端的日程同步管理
"""
from flask import Flask, request, jsonify, send_from_directory, Response
from flask_cors import CORS
from datetime import datetime, timedelta, timezone
import sqlite3
import os
import json
import time
import requests
from werkzeug.utils import secure_filename
app = Flask(__name__)
CORS(app) # 允许跨域请求
# 北京时区 (UTC+8)
BEIJING_TZ = timezone(timedelta(hours=8))
def get_beijing_time():
"""获取当前北京时间"""
return datetime.now(BEIJING_TZ)
def utc_to_beijing(utc_str):
"""将UTC时间字符串转换为北京时间字符串"""
if not utc_str:
return None
try:
# 解析UTC时间SQLite的CURRENT_TIMESTAMP格式
utc_dt = datetime.strptime(utc_str, '%Y-%m-%d %H:%M:%S')
# 添加UTC时区信息
utc_dt = utc_dt.replace(tzinfo=timezone.utc)
# 转换为北京时间
beijing_dt = utc_dt.astimezone(BEIJING_TZ)
# 返回不带时区信息的字符串
return beijing_dt.strftime('%Y-%m-%d %H:%M:%S')
except:
return utc_str
# 数据库配置
DB_PATH = os.path.join(os.path.dirname(__file__), 'kinecho.db')
# 文件上传配置
UPLOAD_FOLDER = os.path.join(os.path.dirname(__file__), 'uploads')
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'mp4', 'mov', 'avi'}
MAX_FILE_SIZE = 100 * 1024 * 1024 # 100MB
# 确保上传目录存在
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(os.path.join(UPLOAD_FOLDER, 'thumbnails'), exist_ok=True)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = MAX_FILE_SIZE
def allowed_file(filename):
"""检查文件扩展名是否允许"""
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def generate_video_thumbnail(video_path, filename):
"""使用ffmpeg生成视频缩略图"""
import subprocess
thumbnail_filename = filename.rsplit('.', 1)[0] + '_thumb.jpg'
thumbnail_path = os.path.join(UPLOAD_FOLDER, 'thumbnails', thumbnail_filename)
try:
# 使用ffmpeg截取第1秒的帧作为缩略图
cmd = [
'ffmpeg', '-i', video_path,
'-ss', '00:00:01',
'-vframes', '1',
'-vf', 'scale=320:-1',
'-y', thumbnail_path
]
subprocess.run(cmd, capture_output=True, check=True, timeout=30)
return thumbnail_path
except Exception as e:
print(f'生成视频缩略图失败: {e}')
return None
def generate_photo_thumbnail(photo_path, filename):
"""生成图片缩略图"""
from PIL import Image
thumbnail_filename = filename.rsplit('.', 1)[0] + '_thumb.jpg'
thumbnail_path = os.path.join(UPLOAD_FOLDER, 'thumbnails', thumbnail_filename)
try:
with Image.open(photo_path) as img:
# 转换为RGB处理PNG等格式
if img.mode in ('RGBA', 'P'):
img = img.convert('RGB')
# 生成缩略图,保持比例
img.thumbnail((320, 320))
img.save(thumbnail_path, 'JPEG', quality=85)
return thumbnail_path
except Exception as e:
print(f'生成图片缩略图失败: {e}')
return None
def get_db():
"""获取数据库连接"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row # 返回字典格式
return conn
def init_db():
"""初始化数据库"""
conn = get_db()
cursor = conn.cursor()
# 用户表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_type TEXT NOT NULL, -- 'family' 'elderly'
name TEXT NOT NULL,
phone TEXT,
family_id TEXT, -- 家庭组ID关联家人和老人
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 日程/护理计划表
cursor.execute('''
CREATE TABLE IF NOT EXISTS schedules (
id INTEGER PRIMARY KEY AUTOINCREMENT,
family_id TEXT NOT NULL, -- 家庭组ID
title TEXT NOT NULL, -- 日程标题
description TEXT, -- 详细描述
schedule_type TEXT, -- 类型medication(用药)exercise(运动)meal(饮食)checkup(检查)
schedule_time TIMESTAMP NOT NULL, -- 日程时间
repeat_type TEXT DEFAULT 'once', -- 重复类型once, daily, weekly, monthly
repeat_days TEXT, -- 重复的星期几JSON格式[1,3,5]
status TEXT DEFAULT 'pending', -- 状态pending(待执行), completed(已完成), skipped(已放弃), missed(已错过)
completed_at TIMESTAMP, -- 完成时间
auto_remind INTEGER DEFAULT 1, -- 数字人自动播报1=启用0=禁用
is_active INTEGER DEFAULT 1, -- 是否启用
created_by INTEGER, -- 创建者用户ID
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (created_by) REFERENCES users(id)
)
''')
# 提醒记录表
cursor.execute('''
CREATE TABLE IF NOT EXISTS reminders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
schedule_id INTEGER NOT NULL,
elderly_id INTEGER NOT NULL, -- 老人用户ID
remind_time TIMESTAMP NOT NULL, -- 提醒时间
status TEXT DEFAULT 'pending', -- pending, completed, missed, dismissed
completed_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (schedule_id) REFERENCES schedules(id),
FOREIGN KEY (elderly_id) REFERENCES users(id)
)
''')
# 媒体文件表
cursor.execute('''
CREATE TABLE IF NOT EXISTS media (
id INTEGER PRIMARY KEY AUTOINCREMENT,
family_id TEXT NOT NULL, -- 家庭组ID
media_type TEXT NOT NULL, -- 'photo' 'video'
title TEXT NOT NULL, -- 媒体标题
description TEXT, -- 描述
file_path TEXT NOT NULL, -- 文件存储路径
file_size INTEGER, -- 文件大小字节
duration INTEGER, -- 视频时长仅视频有值
thumbnail_path TEXT, -- 缩略图路径
uploaded_by INTEGER, -- 上传者用户ID
is_active INTEGER DEFAULT 1, -- 是否启用
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (uploaded_by) REFERENCES users(id)
)
''')
# 媒体标签表
cursor.execute('''
CREATE TABLE IF NOT EXISTS media_tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
media_id INTEGER NOT NULL,
tag TEXT NOT NULL, -- 标签内容 '孙女小米', '生日', '旅行'
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (media_id) REFERENCES media(id) ON DELETE CASCADE
)
''')
# 媒体触发策略表
cursor.execute('''
CREATE TABLE IF NOT EXISTS media_policies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
media_id INTEGER NOT NULL,
time_windows TEXT, -- 播放时段JSON格式["07:00-09:00", "19:00-21:00"]
moods TEXT, -- 适合心境JSON格式["happy", "sad", "calm"]
occasions TEXT, -- 特殊场合JSON格式["birthday", "anniversary"]
cooldown INTEGER DEFAULT 60, -- 冷却时间分钟避免重复播放
priority INTEGER DEFAULT 5, -- 优先级 1-10数字越大优先级越高
last_played_at TIMESTAMP, -- 上次播放时间
play_count INTEGER DEFAULT 0, -- 播放次数
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (media_id) REFERENCES media(id) ON DELETE CASCADE
)
''')
# 媒体播放历史表
cursor.execute('''
CREATE TABLE IF NOT EXISTS media_play_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
media_id INTEGER NOT NULL,
elderly_id INTEGER NOT NULL, -- 老人用户ID
played_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- 播放时间
duration_watched INTEGER, -- 观看时长
completed INTEGER DEFAULT 0, -- 是否看完1=0=
triggered_by TEXT, -- 触发方式'auto'=自动, 'manual'=手动, 'mood'=情绪触发等
mood_before TEXT, -- 播放前情绪状态
mood_after TEXT, -- 播放后情绪状态
FOREIGN KEY (media_id) REFERENCES media(id),
FOREIGN KEY (elderly_id) REFERENCES users(id)
)
''')
# 媒体反馈表(点赞/点踩)
cursor.execute('''
CREATE TABLE IF NOT EXISTS media_feedback (
id INTEGER PRIMARY KEY AUTOINCREMENT,
media_id INTEGER NOT NULL,
elderly_id INTEGER NOT NULL, -- 老人用户ID
feedback_type TEXT NOT NULL, -- 'like' 'dislike'
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (media_id) REFERENCES media(id),
FOREIGN KEY (elderly_id) REFERENCES users(id),
UNIQUE(media_id, elderly_id) -- 每个老人对每个媒体只能有一个反馈
)
''')
# 家属留言表
cursor.execute('''
CREATE TABLE IF NOT EXISTS family_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
family_id TEXT NOT NULL, -- 家庭组ID
content TEXT NOT NULL, -- 留言内容
sender_name TEXT NOT NULL, -- 发送者姓名
sender_relation TEXT NOT NULL, -- 发送者称呼儿子女儿孙女等
scheduled_time TIMESTAMP NOT NULL, -- 预约播报时间
played INTEGER DEFAULT 0, -- 是否已播放0=未播放1=已播放
played_at TIMESTAMP, -- 实际播报时间
liked INTEGER DEFAULT 0, -- 老人是否点赞0=未点赞1=已点赞
is_active INTEGER DEFAULT 1, -- 是否有效
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 家属端消息/告警表(优化版)
cursor.execute('''
CREATE TABLE IF NOT EXISTS family_alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
family_id TEXT NOT NULL, -- 家庭组ID
elderly_id INTEGER, -- 老人用户ID可选用于关联具体老人
alert_type TEXT NOT NULL, -- 消息类型sos_emergency, contact_family, medication, emotion, inactive, emergency
level TEXT NOT NULL, -- 级别low, medium, high
title TEXT, -- 消息标题简短概要
message TEXT NOT NULL, -- 消息详细内容
metadata TEXT, -- 额外元数据JSON格式 {"location": "客厅", "device": "平板"}
source TEXT DEFAULT 'elderly', -- 消息来源elderly(老人端), system(系统自动), family(家属端)
handled INTEGER DEFAULT 0, -- 是否已处理0=未处理1=已处理
handled_at TIMESTAMP, -- 处理时间
handled_by INTEGER, -- 处理人用户ID
reply_message TEXT, -- 家属回复内容
read INTEGER DEFAULT 0, -- 是否已读0=未读1=已读
read_at TIMESTAMP, -- 阅读时间
is_active INTEGER DEFAULT 1, -- 是否有效
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (elderly_id) REFERENCES users(id),
FOREIGN KEY (handled_by) REFERENCES users(id)
)
''')
# 情绪记录表
cursor.execute('''
CREATE TABLE IF NOT EXISTS mood_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
family_id TEXT NOT NULL, -- 家庭组ID
elderly_id INTEGER, -- 老人用户ID
mood_type TEXT NOT NULL, -- 情绪类型happy(开心), calm(平静), sad(难过), anxious(焦虑), angry(生气), tired(疲惫)
mood_score INTEGER DEFAULT 5, -- 情绪分数 1-10数字越大越积极
note TEXT, -- 备注说明
source TEXT DEFAULT 'manual', -- 来源manual(手动记录), ai_detect(AI检测), voice(语音分析)
trigger_event TEXT, -- 触发事件 '看了家人照片', '完成了散步'
location TEXT, -- 记录地点
weather TEXT, -- 天气情况
recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- 记录时间
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (elderly_id) REFERENCES users(id)
)
''')
# 情绪记录索引
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_mood_records_family_id
ON mood_records(family_id)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_mood_records_elderly_id
ON mood_records(elderly_id)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_mood_records_recorded_at
ON mood_records(recorded_at DESC)
''')
# 创建索引以提高查询性能
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_family_alerts_family_id
ON family_alerts(family_id)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_family_alerts_created_at
ON family_alerts(created_at DESC)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_family_alerts_handled
ON family_alerts(handled, created_at DESC)
''')
conn.commit()
conn.close()
# ==================== 家人端 API ====================
@app.route('/api/family/schedules', methods=['GET'])
def get_family_schedules():
"""获取家庭所有日程"""
family_id = request.args.get('family_id')
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
SELECT s.*, u.name as creator_name
FROM schedules s
LEFT JOIN users u ON s.created_by = u.id
WHERE s.family_id = ? AND s.is_active = 1
ORDER BY s.schedule_time DESC
''', (family_id,))
schedules = [dict(row) for row in cursor.fetchall()]
conn.close()
return jsonify({'schedules': schedules})
@app.route('/api/family/schedules', methods=['POST'])
def create_schedule():
"""创建新日程"""
data = request.json
required_fields = ['family_id', 'title', 'schedule_time']
if not all(field in data for field in required_fields):
return jsonify({'error': '缺少必需字段'}), 400
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO schedules (
family_id, title, description, schedule_type,
schedule_time, repeat_type, repeat_days, auto_remind, created_by
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
data['family_id'],
data['title'],
data.get('description', ''),
data.get('schedule_type', 'other'),
data['schedule_time'],
data.get('repeat_type', 'once'),
data.get('repeat_days', ''),
data.get('auto_remind', 1),
data.get('created_by')
))
schedule_id = cursor.lastrowid
conn.commit()
conn.close()
return jsonify({'success': True, 'schedule_id': schedule_id}), 201
@app.route('/api/family/schedules/<int:schedule_id>', methods=['PUT'])
def update_schedule(schedule_id):
"""更新日程"""
data = request.json
conn = get_db()
cursor = conn.cursor()
# 构建更新语句
update_fields = []
params = []
for field in ['title', 'description', 'schedule_type', 'schedule_time', 'repeat_type', 'repeat_days', 'auto_remind', 'status']:
if field in data:
update_fields.append(f"{field} = ?")
params.append(data[field])
if not update_fields:
return jsonify({'error': '没有要更新的字段'}), 400
update_fields.append("updated_at = CURRENT_TIMESTAMP")
params.append(schedule_id)
cursor.execute(f'''
UPDATE schedules
SET {', '.join(update_fields)}
WHERE id = ?
''', params)
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/api/family/schedules/<int:schedule_id>', methods=['DELETE'])
def delete_schedule(schedule_id):
"""删除日程(软删除)"""
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
UPDATE schedules
SET is_active = 0, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (schedule_id,))
conn.commit()
conn.close()
return jsonify({'success': True})
# ==================== 家属端消息/告警 API ====================
@app.route('/api/family/alerts', methods=['GET'])
def get_family_alerts():
"""获取家庭所有消息/告警"""
family_id = request.args.get('family_id')
status = request.args.get('status') # all, unhandled, handled
handled = request.args.get('handled') # true/false 布尔值
read = request.args.get('read') # true/false 布尔值
alert_type = request.args.get('alert_type') # 消息类型
elderly_id = request.args.get('elderly_id', type=int) # 老人ID
level = request.args.get('level') # low, medium, high
limit = request.args.get('limit', 100, type=int)
offset = request.args.get('offset', 0, type=int)
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
# 构建查询条件
conditions = ['a.family_id = ?', 'a.is_active = 1']
params = [family_id]
# 排除媒体展示事件(这些只用于老人端轮询,不应出现在家属端通知列表)
conditions.append("a.alert_type != 'media_display'")
# 支持status参数兼容旧版
if status == 'unhandled':
conditions.append('a.handled = 0')
elif status == 'handled':
conditions.append('a.handled = 1')
# 支持handled参数布尔值
if handled is not None:
if handled.lower() == 'true':
conditions.append('a.handled = 1')
elif handled.lower() == 'false':
conditions.append('a.handled = 0')
# 支持read参数布尔值
if read is not None:
if read.lower() == 'true':
conditions.append('a.read = 1')
elif read.lower() == 'false':
conditions.append('a.read = 0')
# 支持alert_type参数
if alert_type:
conditions.append('a.alert_type = ?')
params.append(alert_type)
# 支持elderly_id参数
if elderly_id:
conditions.append('a.elderly_id = ?')
params.append(elderly_id)
if level:
conditions.append('a.level = ?')
params.append(level)
where_clause = ' AND '.join(conditions)
# 查询总数
cursor.execute(f'''
SELECT COUNT(*) as total FROM family_alerts a
WHERE {where_clause}
''', params)
total = cursor.fetchone()['total']
# 查询数据(包含老人信息)
cursor.execute(f'''
SELECT
a.*,
u.name as elderly_name,
h.name as handler_name
FROM family_alerts a
LEFT JOIN users u ON a.elderly_id = u.id
LEFT JOIN users h ON a.handled_by = h.id
WHERE {where_clause}
ORDER BY a.created_at DESC
LIMIT ? OFFSET ?
''', params + [limit, offset])
alerts = []
for row in cursor.fetchall():
alert = dict(row)
# 转换布尔值
alert['handled'] = bool(alert['handled'])
alert['read'] = bool(alert['read'])
# 解析元数据JSON
if alert['metadata']:
try:
alert['metadata'] = json.loads(alert['metadata'])
except:
alert['metadata'] = {}
alerts.append(alert)
conn.close()
return jsonify({
'alerts': alerts,
'total': total,
'limit': limit,
'offset': offset
})
@app.route('/api/family/alerts', methods=['POST'])
def create_alert():
"""创建新消息/告警(由老人端或系统触发)"""
data = request.json
required_fields = ['family_id', 'alert_type', 'level', 'message']
if not all(field in data for field in required_fields):
return jsonify({'error': '缺少必需字段'}), 400
conn = get_db()
cursor = conn.cursor()
# 处理元数据
metadata = data.get('metadata', {})
metadata_json = json.dumps(metadata) if metadata else None
cursor.execute('''
INSERT INTO family_alerts (
family_id, elderly_id, alert_type, level, title, message,
metadata, source
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
data['family_id'],
data.get('elderly_id'),
data['alert_type'],
data['level'],
data.get('title'),
data['message'],
metadata_json,
data.get('source', 'elderly')
))
alert_id = cursor.lastrowid
conn.commit()
conn.close()
return jsonify({'success': True, 'alert_id': alert_id}), 201
@app.route('/api/family/alerts/<int:alert_id>/handle', methods=['POST'])
def handle_alert(alert_id):
"""标记消息/告警为已处理"""
data = request.json or {}
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
UPDATE family_alerts
SET handled = 1,
handled_at = CURRENT_TIMESTAMP,
handled_by = ?,
reply_message = ?,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (
data.get('handled_by'),
data.get('reply_message'),
alert_id
))
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/api/family/alerts/<int:alert_id>/read', methods=['POST'])
def mark_alert_read(alert_id):
"""标记消息为已读"""
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
UPDATE family_alerts
SET read = 1,
read_at = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (alert_id,))
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/api/family/alerts/<int:alert_id>/reply', methods=['POST'])
def reply_alert(alert_id):
"""家属回复消息"""
data = request.json
if not data or 'reply_message' not in data:
return jsonify({'error': '缺少reply_message字段'}), 400
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
UPDATE family_alerts
SET reply_message = ?,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (data['reply_message'], alert_id))
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/api/family/alerts/<int:alert_id>', methods=['DELETE'])
def delete_alert(alert_id):
"""删除消息/告警(软删除)"""
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
UPDATE family_alerts
SET is_active = 0,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (alert_id,))
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/api/family/alerts/stats', methods=['GET'])
def get_alerts_stats():
"""获取消息统计数据"""
family_id = request.args.get('family_id')
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
# 统计各级别消息数量(排除媒体展示事件)
cursor.execute('''
SELECT
level,
COUNT(*) as count
FROM family_alerts
WHERE family_id = ? AND is_active = 1 AND alert_type != 'media_display'
GROUP BY level
''', (family_id,))
level_stats = {row['level']: row['count'] for row in cursor.fetchall()}
# 统计各类型消息数量(排除媒体展示事件)
cursor.execute('''
SELECT
alert_type,
COUNT(*) as count
FROM family_alerts
WHERE family_id = ? AND is_active = 1 AND alert_type != 'media_display'
GROUP BY alert_type
''', (family_id,))
type_stats = {row['alert_type']: row['count'] for row in cursor.fetchall()}
# 统计已处理/未处理(排除媒体展示事件)
cursor.execute('''
SELECT
COUNT(CASE WHEN handled = 0 THEN 1 END) as unhandled,
COUNT(CASE WHEN handled = 1 THEN 1 END) as handled,
COUNT(CASE WHEN read = 0 THEN 1 END) as unread
FROM family_alerts
WHERE family_id = ? AND is_active = 1 AND alert_type != 'media_display'
''', (family_id,))
status_stats = dict(cursor.fetchone())
# 今日新增消息数(排除媒体展示事件)
cursor.execute('''
SELECT COUNT(*) as today_count
FROM family_alerts
WHERE family_id = ? AND is_active = 1 AND alert_type != 'media_display'
AND DATE(created_at) = DATE('now')
''', (family_id,))
today_count = cursor.fetchone()['today_count']
conn.close()
return jsonify({
'level_stats': level_stats,
'type_stats': type_stats,
'status_stats': status_stats,
'today_count': today_count
})
# ==================== 家属留言 API ====================
@app.route('/api/family/messages', methods=['GET'])
def get_family_messages():
"""获取家庭所有留言"""
family_id = request.args.get('family_id')
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM family_messages
WHERE family_id = ? AND is_active = 1
ORDER BY created_at DESC
''', (family_id,))
messages = []
for row in cursor.fetchall():
msg = dict(row)
# 转换布尔值
msg['played'] = bool(msg['played'])
msg['liked'] = bool(msg['liked'])
# 转换UTC时间为北京时间
msg['created_at'] = utc_to_beijing(msg['created_at'])
msg['updated_at'] = utc_to_beijing(msg['updated_at'])
if msg.get('played_at'):
msg['played_at'] = utc_to_beijing(msg['played_at'])
messages.append(msg)
conn.close()
return jsonify({'messages': messages})
@app.route('/api/family/messages', methods=['POST'])
def create_message():
"""创建新留言"""
data = request.json
required_fields = ['family_id', 'content', 'sender_name', 'sender_relation', 'scheduled_time']
if not all(field in data for field in required_fields):
return jsonify({'error': '缺少必需字段'}), 400
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO family_messages (
family_id, content, sender_name, sender_relation, scheduled_time
) VALUES (?, ?, ?, ?, ?)
''', (
data['family_id'],
data['content'],
data['sender_name'],
data['sender_relation'],
data['scheduled_time']
))
message_id = cursor.lastrowid
conn.commit()
conn.close()
return jsonify({'success': True, 'message_id': message_id}), 201
@app.route('/api/family/messages/<int:message_id>', methods=['DELETE'])
def delete_message(message_id):
"""删除留言(软删除)"""
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
UPDATE family_messages
SET is_active = 0, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (message_id,))
conn.commit()
conn.close()
return jsonify({'success': True})
# ==================== 老人端留言 API ====================
@app.route('/api/elderly/messages', methods=['GET'])
def get_elderly_messages():
"""获取老人端的留言列表(按预约时间排序)"""
family_id = request.args.get('family_id')
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM family_messages
WHERE family_id = ? AND is_active = 1
ORDER BY scheduled_time ASC
''', (family_id,))
messages = []
for row in cursor.fetchall():
msg = dict(row)
msg['played'] = bool(msg['played'])
msg['liked'] = bool(msg['liked'])
# 转换UTC时间为北京时间
msg['created_at'] = utc_to_beijing(msg['created_at'])
msg['updated_at'] = utc_to_beijing(msg['updated_at'])
if msg.get('played_at'):
msg['played_at'] = utc_to_beijing(msg['played_at'])
messages.append(msg)
conn.close()
return jsonify({'messages': messages})
@app.route('/api/elderly/messages/pending', methods=['GET'])
def get_pending_messages():
"""获取待播放的留言(预约时间已到但未播放的)"""
family_id = request.args.get('family_id')
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
# 调试日志 - 使用北京时间
beijing_now = get_beijing_time()
current_time = beijing_now.strftime('%Y-%m-%d %H:%M:%S')
print(f"[DEBUG] 当前北京时间: {current_time}")
cursor.execute('''
SELECT * FROM family_messages
WHERE family_id = ?
AND is_active = 1
AND played = 0
ORDER BY scheduled_time ASC
''', (family_id,))
# 获取所有未播放的消息
all_messages = []
for row in cursor.fetchall():
msg = dict(row)
print(f"[DEBUG] 留言 ID: {msg['id']}, 预约时间: {msg['scheduled_time']}")
all_messages.append(msg)
# 在 Python 中进行时间比较(更可靠) - 使用北京时间
now = beijing_now.replace(tzinfo=None) # 移除时区信息以便比较
messages = []
for msg in all_messages:
try:
# 处理各种可能的时间格式
scheduled_str = msg['scheduled_time']
# 移除 'T',统一为空格分隔
scheduled_str = scheduled_str.replace('T', ' ')
# 如果没有秒数,添加 :00
if len(scheduled_str) == 16: # YYYY-MM-DD HH:MM
scheduled_str += ':00'
scheduled_time = datetime.strptime(scheduled_str, '%Y-%m-%d %H:%M:%S')
print(f"[DEBUG] 解析后时间: {scheduled_time}, 当前北京时间: {now}, 已到期: {scheduled_time <= now}")
if scheduled_time <= now:
msg['played'] = bool(msg['played'])
msg['liked'] = bool(msg['liked'])
# 转换UTC时间为北京时间
msg['created_at'] = utc_to_beijing(msg['created_at'])
msg['updated_at'] = utc_to_beijing(msg['updated_at'])
if msg.get('played_at'):
msg['played_at'] = utc_to_beijing(msg['played_at'])
messages.append(msg)
except Exception as e:
print(f"[DEBUG] 时间解析错误: {e}, 原始值: {msg['scheduled_time']}")
continue
conn.close()
print(f"[DEBUG] 找到 {len(messages)} 条待播放留言")
return jsonify({'messages': messages})
@app.route('/api/elderly/messages/<int:message_id>/play', methods=['POST'])
def play_message(message_id):
"""标记留言为已播放"""
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
UPDATE family_messages
SET played = 1,
played_at = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (message_id,))
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/api/elderly/messages/<int:message_id>/like', methods=['POST'])
def like_message(message_id):
"""老人点赞留言"""
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
UPDATE family_messages
SET liked = 1, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (message_id,))
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/api/elderly/messages/<int:message_id>/unlike', methods=['POST'])
def unlike_message(message_id):
"""老人取消点赞"""
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
UPDATE family_messages
SET liked = 0, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (message_id,))
conn.commit()
conn.close()
return jsonify({'success': True})
# ==================== 老人端消息/告警 API ====================
@app.route('/api/elderly/alerts', methods=['POST'])
def create_elderly_alert():
"""老人端创建消息如SOS、联系家人"""
data = request.json
required_fields = ['family_id', 'alert_type', 'level', 'message']
if not all(field in data for field in required_fields):
return jsonify({'error': '缺少必需字段'}), 400
conn = get_db()
cursor = conn.cursor()
# 处理元数据
metadata = data.get('metadata', {})
metadata_json = json.dumps(metadata) if metadata else None
cursor.execute('''
INSERT INTO family_alerts (
family_id, elderly_id, alert_type, level, title, message,
metadata, source
) VALUES (?, ?, ?, ?, ?, ?, ?, 'elderly')
''', (
data['family_id'],
data.get('elderly_id'),
data['alert_type'],
data['level'],
data.get('title'),
data['message'],
metadata_json
))
alert_id = cursor.lastrowid
conn.commit()
conn.close()
return jsonify({'success': True, 'alert_id': alert_id}), 201
@app.route('/api/elderly/alerts/replies', methods=['GET'])
def get_elderly_alert_replies():
"""获取家属对老人消息的回复"""
family_id = request.args.get('family_id')
elderly_id = request.args.get('elderly_id')
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
conditions = ['family_id = ?', 'is_active = 1', 'reply_message IS NOT NULL']
params = [family_id]
if elderly_id:
conditions.append('elderly_id = ?')
params.append(elderly_id)
where_clause = ' AND '.join(conditions)
cursor.execute(f'''
SELECT
id, alert_type, level, message, reply_message,
handled_at, created_at
FROM family_alerts
WHERE {where_clause}
ORDER BY handled_at DESC
LIMIT 10
''', params)
replies = [dict(row) for row in cursor.fetchall()]
conn.close()
return jsonify({'replies': replies})
# ==================== 情绪记录 API ====================
@app.route('/api/elderly/moods', methods=['POST'])
def create_mood_record():
"""老人端创建情绪记录"""
data = request.json
required_fields = ['family_id', 'mood_type']
if not all(field in data for field in required_fields):
return jsonify({'error': '缺少必需字段'}), 400
# 验证情绪类型
valid_moods = ['happy', 'calm', 'sad', 'anxious', 'angry', 'tired']
if data['mood_type'] not in valid_moods:
return jsonify({'error': '无效的情绪类型'}), 400
# 验证情绪分数范围
mood_score = data.get('mood_score', 5)
if not (1 <= mood_score <= 10):
return jsonify({'error': '情绪分数必须在1-10之间'}), 400
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO mood_records (
family_id, elderly_id, mood_type, mood_score, note,
source, trigger_event, location, weather, recorded_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
data['family_id'],
data.get('elderly_id'),
data['mood_type'],
mood_score,
data.get('note', ''),
data.get('source', 'manual'),
data.get('trigger_event', ''),
data.get('location', ''),
data.get('weather', ''),
data.get('recorded_at', datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
))
record_id = cursor.lastrowid
conn.commit()
conn.close()
return jsonify({'success': True, 'record_id': record_id}), 201
@app.route('/api/elderly/moods', methods=['GET'])
def get_elderly_moods():
"""获取老人的情绪记录列表"""
family_id = request.args.get('family_id')
elderly_id = request.args.get('elderly_id')
limit = request.args.get('limit', 50, type=int)
offset = request.args.get('offset', 0, type=int)
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
conditions = ['family_id = ?']
params = [family_id]
if elderly_id:
conditions.append('elderly_id = ?')
params.append(elderly_id)
where_clause = ' AND '.join(conditions)
# 查询总数
cursor.execute(f'''
SELECT COUNT(*) as total FROM mood_records
WHERE {where_clause}
''', params)
total = cursor.fetchone()['total']
# 查询数据
cursor.execute(f'''
SELECT * FROM mood_records
WHERE {where_clause}
ORDER BY recorded_at DESC
LIMIT ? OFFSET ?
''', params + [limit, offset])
records = [dict(row) for row in cursor.fetchall()]
conn.close()
return jsonify({
'records': records,
'total': total,
'limit': limit,
'offset': offset
})
@app.route('/api/elderly/moods/today', methods=['GET'])
def get_today_moods():
"""获取老人今日的情绪记录"""
family_id = request.args.get('family_id')
elderly_id = request.args.get('elderly_id')
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
today = datetime.now().strftime('%Y-%m-%d')
conditions = ['family_id = ?', 'DATE(recorded_at) = DATE(?)']
params = [family_id, today]
if elderly_id:
conditions.append('elderly_id = ?')
params.append(elderly_id)
where_clause = ' AND '.join(conditions)
cursor.execute(f'''
SELECT * FROM mood_records
WHERE {where_clause}
ORDER BY recorded_at DESC
''', params)
records = [dict(row) for row in cursor.fetchall()]
conn.close()
return jsonify({'records': records})
@app.route('/api/elderly/moods/latest', methods=['GET'])
def get_latest_mood():
"""获取老人最新的情绪记录"""
family_id = request.args.get('family_id')
elderly_id = request.args.get('elderly_id')
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
conditions = ['family_id = ?']
params = [family_id]
if elderly_id:
conditions.append('elderly_id = ?')
params.append(elderly_id)
where_clause = ' AND '.join(conditions)
cursor.execute(f'''
SELECT * FROM mood_records
WHERE {where_clause}
ORDER BY recorded_at DESC
LIMIT 1
''', params)
row = cursor.fetchone()
conn.close()
if row:
return jsonify({'record': dict(row)})
else:
return jsonify({'record': None})
# ==================== 家属端情绪记录 API ====================
@app.route('/api/family/moods', methods=['GET'])
def get_family_moods():
"""家属端获取老人的情绪记录"""
family_id = request.args.get('family_id')
elderly_id = request.args.get('elderly_id')
mood_type = request.args.get('mood_type')
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
limit = request.args.get('limit', 100, type=int)
offset = request.args.get('offset', 0, type=int)
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
conditions = ['m.family_id = ?']
params = [family_id]
if elderly_id:
conditions.append('m.elderly_id = ?')
params.append(elderly_id)
if mood_type:
conditions.append('m.mood_type = ?')
params.append(mood_type)
if start_date:
conditions.append('DATE(m.recorded_at) >= DATE(?)')
params.append(start_date)
if end_date:
conditions.append('DATE(m.recorded_at) <= DATE(?)')
params.append(end_date)
where_clause = ' AND '.join(conditions)
# 查询总数
cursor.execute(f'''
SELECT COUNT(*) as total FROM mood_records m
WHERE {where_clause}
''', params)
total = cursor.fetchone()['total']
# 查询数据(包含老人信息)
cursor.execute(f'''
SELECT
m.*,
u.name as elderly_name
FROM mood_records m
LEFT JOIN users u ON m.elderly_id = u.id
WHERE {where_clause}
ORDER BY m.recorded_at DESC
LIMIT ? OFFSET ?
''', params + [limit, offset])
records = [dict(row) for row in cursor.fetchall()]
conn.close()
return jsonify({
'records': records,
'total': total,
'limit': limit,
'offset': offset
})
@app.route('/api/family/moods/stats', methods=['GET'])
def get_mood_stats():
"""获取情绪统计数据"""
family_id = request.args.get('family_id')
elderly_id = request.args.get('elderly_id')
days = request.args.get('days', 7, type=int) # 统计最近N天
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
conditions = ['family_id = ?']
params = [family_id]
if elderly_id:
conditions.append('elderly_id = ?')
params.append(elderly_id)
# 添加时间范围条件
conditions.append(f"DATE(recorded_at) >= DATE('now', '-{days} days')")
where_clause = ' AND '.join(conditions)
# 按情绪类型统计
cursor.execute(f'''
SELECT
mood_type,
COUNT(*) as count,
AVG(mood_score) as avg_score
FROM mood_records
WHERE {where_clause}
GROUP BY mood_type
ORDER BY count DESC
''', params)
mood_type_stats = []
for row in cursor.fetchall():
stat = dict(row)
stat['avg_score'] = round(stat['avg_score'], 1) if stat['avg_score'] else 0
mood_type_stats.append(stat)
# 按日期统计平均分数
cursor.execute(f'''
SELECT
DATE(recorded_at) as date,
AVG(mood_score) as avg_score,
COUNT(*) as count
FROM mood_records
WHERE {where_clause}
GROUP BY DATE(recorded_at)
ORDER BY date DESC
''', params)
daily_stats = []
for row in cursor.fetchall():
stat = dict(row)
stat['avg_score'] = round(stat['avg_score'], 1) if stat['avg_score'] else 0
daily_stats.append(stat)
# 计算整体统计
cursor.execute(f'''
SELECT
COUNT(*) as total_records,
AVG(mood_score) as avg_score,
MAX(mood_score) as max_score,
MIN(mood_score) as min_score
FROM mood_records
WHERE {where_clause}
''', params)
overall = dict(cursor.fetchone())
overall['avg_score'] = round(overall['avg_score'], 1) if overall['avg_score'] else 0
# 今日记录数
cursor.execute(f'''
SELECT COUNT(*) as today_count
FROM mood_records
WHERE {where_clause.replace(f"DATE(recorded_at) >= DATE('now', '-{days} days')", "DATE(recorded_at) = DATE('now')")}
''', params)
today_count = cursor.fetchone()['today_count']
conn.close()
return jsonify({
'mood_type_stats': mood_type_stats,
'daily_stats': daily_stats,
'overall': overall,
'today_count': today_count,
'days': days
})
@app.route('/api/family/moods/trend', methods=['GET'])
def get_mood_trend():
"""获取情绪趋势数据"""
family_id = request.args.get('family_id')
elderly_id = request.args.get('elderly_id')
days = request.args.get('days', 30, type=int)
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
conditions = ['family_id = ?']
params = [family_id]
if elderly_id:
conditions.append('elderly_id = ?')
params.append(elderly_id)
conditions.append(f"DATE(recorded_at) >= DATE('now', '-{days} days')")
where_clause = ' AND '.join(conditions)
# 按日期获取情绪趋势
cursor.execute(f'''
SELECT
DATE(recorded_at) as date,
mood_type,
AVG(mood_score) as avg_score,
COUNT(*) as count
FROM mood_records
WHERE {where_clause}
GROUP BY DATE(recorded_at), mood_type
ORDER BY date ASC, count DESC
''', params)
trend_data = []
for row in cursor.fetchall():
item = dict(row)
item['avg_score'] = round(item['avg_score'], 1) if item['avg_score'] else 0
trend_data.append(item)
conn.close()
return jsonify({
'trend': trend_data,
'days': days
})
# ==================== 老人端 API ====================
@app.route('/api/elderly/schedules/today', methods=['GET'])
def get_today_schedules():
"""获取老人今日日程"""
family_id = request.args.get('family_id')
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
# 获取今天的日期范围
today = datetime.now().strftime('%Y-%m-%d')
cursor.execute('''
SELECT * FROM schedules
WHERE family_id = ?
AND is_active = 1
AND (
(repeat_type = 'once' AND DATE(schedule_time) = DATE(?))
OR repeat_type = 'daily'
OR (repeat_type = 'weekly' AND CAST(strftime('%w', ?) AS INTEGER) IN (
SELECT value FROM json_each(repeat_days)
))
)
ORDER BY TIME(schedule_time)
''', (family_id, today, today))
schedules = [dict(row) for row in cursor.fetchall()]
conn.close()
return jsonify({'schedules': schedules})
@app.route('/api/elderly/schedules/upcoming', methods=['GET'])
def get_upcoming_schedules():
"""获取即将到来的日程(下一小时内)"""
family_id = request.args.get('family_id')
elderly_id = request.args.get('elderly_id')
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM schedules
WHERE family_id = ?
AND is_active = 1
AND datetime(schedule_time) BETWEEN datetime('now') AND datetime('now', '+1 hour')
ORDER BY schedule_time
''', (family_id,))
schedules = [dict(row) for row in cursor.fetchall()]
conn.close()
return jsonify({'schedules': schedules})
@app.route('/api/elderly/reminders/<int:reminder_id>/complete', methods=['POST'])
def complete_reminder(reminder_id):
"""标记提醒为已完成"""
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
UPDATE reminders
SET status = 'completed', completed_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (reminder_id,))
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/api/elderly/reminders/<int:reminder_id>/dismiss', methods=['POST'])
def dismiss_reminder(reminder_id):
"""忽略提醒"""
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
UPDATE reminders
SET status = 'dismissed'
WHERE id = ?
''', (reminder_id,))
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/api/elderly/schedules/<int:schedule_id>/status', methods=['POST'])
def update_schedule_status(schedule_id):
"""更新日程状态"""
data = request.json
status = data.get('status') # pending, completed, skipped, missed
if status not in ['pending', 'completed', 'skipped', 'missed']:
return jsonify({'error': '无效的状态值'}), 400
conn = get_db()
cursor = conn.cursor()
# 如果状态是 completed记录完成时间
if status == 'completed':
cursor.execute('''
UPDATE schedules
SET status = ?, completed_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (status, schedule_id))
else:
cursor.execute('''
UPDATE schedules
SET status = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (status, schedule_id))
conn.commit()
conn.close()
return jsonify({'success': True})
# ==================== 用户管理 API ====================
@app.route('/api/users', methods=['POST'])
def create_user():
"""创建用户"""
data = request.json
required_fields = ['user_type', 'name', 'family_id']
if not all(field in data for field in required_fields):
return jsonify({'error': '缺少必需字段'}), 400
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO users (user_type, name, phone, family_id)
VALUES (?, ?, ?, ?)
''', (
data['user_type'],
data['name'],
data.get('phone', ''),
data['family_id']
))
user_id = cursor.lastrowid
conn.commit()
conn.close()
return jsonify({'success': True, 'user_id': user_id}), 201
@app.route('/api/users/<string:family_id>', methods=['GET'])
def get_family_users(family_id):
"""获取家庭成员列表"""
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM users
WHERE family_id = ?
ORDER BY user_type, created_at
''', (family_id,))
users = [dict(row) for row in cursor.fetchall()]
conn.close()
return jsonify({'users': users})
# ==================== 媒体库 API ====================
@app.route('/api/family/media', methods=['POST'])
def upload_media():
"""家属端上传媒体文件"""
# 检查是否有文件
if 'file' not in request.files:
return jsonify({'error': '没有上传文件'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': '文件名为空'}), 400
if not allowed_file(file.filename):
return jsonify({'error': '不支持的文件类型'}), 400
# 获取其他表单数据
family_id = request.form.get('family_id')
title = request.form.get('title')
description = request.form.get('description', '')
uploaded_by = request.form.get('uploaded_by')
if not family_id or not title:
return jsonify({'error': '缺少必需字段'}), 400
# 保存文件
filename = secure_filename(file.filename)
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
unique_filename = f"{timestamp}_{filename}"
file_path = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename)
file.save(file_path)
# 判断媒体类型
ext = filename.rsplit('.', 1)[1].lower()
media_type = 'video' if ext in {'mp4', 'mov', 'avi'} else 'photo'
# 获取文件大小
file_size = os.path.getsize(file_path)
# 生成缩略图
thumbnail_path = None
if media_type == 'video':
thumbnail_path = generate_video_thumbnail(file_path, unique_filename)
elif media_type == 'photo':
thumbnail_path = generate_photo_thumbnail(file_path, unique_filename)
# 插入数据库
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO media (
family_id, media_type, title, description,
file_path, file_size, thumbnail_path, uploaded_by
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (family_id, media_type, title, description, file_path, file_size, thumbnail_path, uploaded_by))
media_id = cursor.lastrowid
# 创建默认触发策略
cursor.execute('''
INSERT INTO media_policies (media_id, time_windows, moods, occasions, cooldown, priority)
VALUES (?, ?, ?, ?, ?, ?)
''', (media_id, '[]', '[]', '[]', 60, 5))
conn.commit()
conn.close()
return jsonify({
'success': True,
'media_id': media_id,
'file_path': file_path,
'media_type': media_type
}), 201
@app.route('/api/family/media', methods=['GET'])
def get_family_media():
"""获取家庭所有媒体列表"""
family_id = request.args.get('family_id')
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
# 获取媒体列表及其标签和策略
cursor.execute('''
SELECT
m.*,
p.time_windows,
p.moods,
p.occasions,
p.cooldown,
p.priority,
p.play_count,
p.last_played_at,
GROUP_CONCAT(t.tag) as tags
FROM media m
LEFT JOIN media_policies p ON m.id = p.media_id
LEFT JOIN media_tags t ON m.id = t.media_id
WHERE m.family_id = ? AND m.is_active = 1
GROUP BY m.id
ORDER BY m.created_at DESC
''', (family_id,))
media_list = []
for row in cursor.fetchall():
media_dict = dict(row)
# 解析标签
if media_dict['tags']:
media_dict['tags'] = media_dict['tags'].split(',')
else:
media_dict['tags'] = []
# 解析JSON字段
for field in ['time_windows', 'moods', 'occasions']:
try:
media_dict[field] = json.loads(media_dict[field]) if media_dict[field] else []
except:
media_dict[field] = []
media_list.append(media_dict)
conn.close()
return jsonify({'media': media_list})
@app.route('/api/family/media/<int:media_id>', methods=['GET'])
def get_media_detail(media_id):
"""获取媒体详情"""
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
SELECT
m.*,
p.time_windows,
p.moods,
p.occasions,
p.cooldown,
p.priority,
p.play_count,
p.last_played_at
FROM media m
LEFT JOIN media_policies p ON m.id = p.media_id
WHERE m.id = ?
''', (media_id,))
row = cursor.fetchone()
if not row:
conn.close()
return jsonify({'error': '媒体不存在'}), 404
media_dict = dict(row)
# 获取标签
cursor.execute('SELECT tag FROM media_tags WHERE media_id = ?', (media_id,))
media_dict['tags'] = [row['tag'] for row in cursor.fetchall()]
# 解析JSON字段
for field in ['time_windows', 'moods', 'occasions']:
try:
media_dict[field] = json.loads(media_dict[field]) if media_dict[field] else []
except:
media_dict[field] = []
# 获取播放统计
cursor.execute('''
SELECT
COUNT(*) as total_plays,
SUM(CASE WHEN feedback_type = 'like' THEN 1 ELSE 0 END) as likes,
SUM(CASE WHEN feedback_type = 'dislike' THEN 1 ELSE 0 END) as dislikes
FROM media_play_history mph
LEFT JOIN media_feedback mf ON mph.media_id = mf.media_id AND mph.elderly_id = mf.elderly_id
WHERE mph.media_id = ?
''', (media_id,))
stats = dict(cursor.fetchone())
media_dict['statistics'] = stats
conn.close()
return jsonify(media_dict)
@app.route('/api/family/media/<int:media_id>', methods=['PUT'])
def update_media(media_id):
"""更新媒体信息和触发策略"""
data = request.json
conn = get_db()
cursor = conn.cursor()
# 更新媒体基本信息
if 'title' in data or 'description' in data:
update_fields = []
params = []
if 'title' in data:
update_fields.append('title = ?')
params.append(data['title'])
if 'description' in data:
update_fields.append('description = ?')
params.append(data['description'])
update_fields.append('updated_at = CURRENT_TIMESTAMP')
params.append(media_id)
cursor.execute(f'''
UPDATE media
SET {', '.join(update_fields)}
WHERE id = ?
''', params)
# 更新标签
if 'tags' in data:
# 删除旧标签
cursor.execute('DELETE FROM media_tags WHERE media_id = ?', (media_id,))
# 添加新标签
for tag in data['tags']:
cursor.execute('''
INSERT INTO media_tags (media_id, tag)
VALUES (?, ?)
''', (media_id, tag))
# 更新触发策略
policy_fields = ['time_windows', 'moods', 'occasions', 'cooldown', 'priority']
policy_updates = []
policy_params = []
for field in policy_fields:
if field in data:
policy_updates.append(f'{field} = ?')
# JSON字段需要序列化
if field in ['time_windows', 'moods', 'occasions']:
policy_params.append(json.dumps(data[field]))
else:
policy_params.append(data[field])
if policy_updates:
policy_updates.append('updated_at = CURRENT_TIMESTAMP')
policy_params.append(media_id)
cursor.execute(f'''
UPDATE media_policies
SET {', '.join(policy_updates)}
WHERE media_id = ?
''', policy_params)
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/api/family/media/<int:media_id>', methods=['DELETE'])
def delete_media(media_id):
"""删除媒体(软删除)"""
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
UPDATE media
SET is_active = 0, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (media_id,))
conn.commit()
conn.close()
return jsonify({'success': True})
# ==================== 老人端媒体 API ====================
@app.route('/api/elderly/media/recommended', methods=['GET'])
def get_recommended_media():
"""获取推荐媒体(基于时段、心境、场合等策略)"""
family_id = request.args.get('family_id')
elderly_id = request.args.get('elderly_id')
current_mood = request.args.get('mood', '') # 当前心境
occasion = request.args.get('occasion', '') # 特殊场合
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
# 获取当前时间
now = datetime.now()
current_time = now.strftime('%H:%M')
# 查询符合条件的媒体
cursor.execute('''
SELECT
m.*,
p.time_windows,
p.moods,
p.occasions,
p.cooldown,
p.priority,
p.play_count,
p.last_played_at,
GROUP_CONCAT(t.tag) as tags
FROM media m
INNER JOIN media_policies p ON m.id = p.media_id
LEFT JOIN media_tags t ON m.id = t.media_id
WHERE m.family_id = ? AND m.is_active = 1
GROUP BY m.id
ORDER BY p.priority DESC, p.play_count ASC
''', (family_id,))
recommended = []
for row in cursor.fetchall():
media_dict = dict(row)
# 解析JSON字段
time_windows = json.loads(media_dict['time_windows']) if media_dict['time_windows'] else []
moods = json.loads(media_dict['moods']) if media_dict['moods'] else []
occasions = json.loads(media_dict['occasions']) if media_dict['occasions'] else []
# 检查冷却时间
if media_dict['last_played_at']:
last_played = datetime.fromisoformat(media_dict['last_played_at'])
cooldown_minutes = media_dict['cooldown']
if now - last_played < timedelta(minutes=cooldown_minutes):
continue # 还在冷却期,跳过
# 检查时段匹配
time_match = not time_windows # 如果没有设置时段,默认匹配
for window in time_windows:
if '-' in window:
start, end = window.split('-')
if start <= current_time <= end:
time_match = True
break
if not time_match:
continue
# 检查心境匹配
mood_match = not moods or not current_mood or current_mood in moods
if not mood_match:
continue
# 检查场合匹配
occasion_match = not occasions or not occasion or occasion in occasions
if not occasion_match:
continue
# 解析标签
if media_dict['tags']:
media_dict['tags'] = media_dict['tags'].split(',')
else:
media_dict['tags'] = []
recommended.append(media_dict)
conn.close()
return jsonify({'media': recommended})
@app.route('/api/elderly/media/<int:media_id>/play', methods=['POST'])
def record_media_play(media_id):
"""记录媒体播放"""
data = request.json
elderly_id = data.get('elderly_id')
duration_watched = data.get('duration_watched', 0)
completed = data.get('completed', 0)
triggered_by = data.get('triggered_by', 'manual')
mood_before = data.get('mood_before', '')
mood_after = data.get('mood_after', '')
if not elderly_id:
return jsonify({'error': '缺少elderly_id'}), 400
conn = get_db()
cursor = conn.cursor()
# 记录播放历史
cursor.execute('''
INSERT INTO media_play_history (
media_id, elderly_id, duration_watched, completed,
triggered_by, mood_before, mood_after
) VALUES (?, ?, ?, ?, ?, ?, ?)
''', (media_id, elderly_id, duration_watched, completed,
triggered_by, mood_before, mood_after))
# 更新媒体策略的播放次数和最后播放时间
cursor.execute('''
UPDATE media_policies
SET play_count = play_count + 1,
last_played_at = CURRENT_TIMESTAMP
WHERE media_id = ?
''', (media_id,))
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/api/elderly/media/<int:media_id>/feedback', methods=['POST'])
def submit_media_feedback(media_id):
"""提交媒体反馈(点赞/点踩)"""
data = request.json
elderly_id = data.get('elderly_id')
feedback_type = data.get('feedback_type') # 'like' 或 'dislike'
if not elderly_id or feedback_type not in ['like', 'dislike']:
return jsonify({'error': '参数错误'}), 400
conn = get_db()
cursor = conn.cursor()
# 使用 INSERT OR REPLACE 来处理重复反馈
cursor.execute('''
INSERT OR REPLACE INTO media_feedback (media_id, elderly_id, feedback_type)
VALUES (?, ?, ?)
''', (media_id, elderly_id, feedback_type))
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/api/elderly/media/history', methods=['GET'])
def get_media_history():
"""获取媒体播放历史"""
elderly_id = request.args.get('elderly_id')
limit = request.args.get('limit', 50)
if not elderly_id:
return jsonify({'error': '缺少elderly_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
SELECT
mph.*,
m.title,
m.media_type,
m.file_path,
mf.feedback_type
FROM media_play_history mph
INNER JOIN media m ON mph.media_id = m.id
LEFT JOIN media_feedback mf ON mph.media_id = mf.media_id AND mph.elderly_id = mf.elderly_id
WHERE mph.elderly_id = ?
ORDER BY mph.played_at DESC
LIMIT ?
''', (elderly_id, limit))
history = [dict(row) for row in cursor.fetchall()]
conn.close()
return jsonify({'history': history})
@app.route('/api/family/media/recent-plays', methods=['GET'])
def get_recent_plays():
"""获取最近播放的媒体(家属端查看)"""
family_id = request.args.get('family_id')
limit = request.args.get('limit', 10)
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
SELECT
m.id,
m.title,
m.media_type,
m.thumbnail_path,
mph.played_at,
COUNT(CASE WHEN mf.feedback_type = 'like' THEN 1 END) as likes,
COUNT(CASE WHEN mf.feedback_type = 'dislike' THEN 1 END) as dislikes
FROM media m
INNER JOIN media_play_history mph ON m.id = mph.media_id
LEFT JOIN media_feedback mf ON m.id = mf.media_id
WHERE m.family_id = ?
GROUP BY m.id, mph.played_at
ORDER BY mph.played_at DESC
LIMIT ?
''', (family_id, limit))
recent = [dict(row) for row in cursor.fetchall()]
conn.close()
return jsonify({'recent_plays': recent})
# ==================== 静态文件服务 ====================
@app.route('/uploads/<path:filename>')
def serve_upload(filename):
"""提供上传文件的访问"""
return send_from_directory(app.config['UPLOAD_FOLDER'], filename)
# ==================== 数字人媒体展示 API ====================
@app.route('/api/elderly/show-media', methods=['POST'])
def show_media_on_avatar():
"""
控制老人端在数字人主页中部弹出透明窗口展示媒体文件
参数:
- media_title: 媒体标题(用于查找媒体文件)
- avatar_text: 数字人播报内容
- duration: 展示时长(),默认30秒
"""
data = request.json
required_fields = ['media_title', 'avatar_text']
if not all(field in data for field in required_fields):
return jsonify({'error': '缺少必需字段: media_title 和 avatar_text'}), 400
media_title = data['media_title']
avatar_text = data['avatar_text']
duration = data.get('duration', 30) # 默认30秒
# 从数据库查找媒体文件
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
SELECT id, media_type, file_path, title
FROM media
WHERE title = ? AND is_active = 1
LIMIT 1
''', (media_title,))
media_row = cursor.fetchone()
if not media_row:
conn.close()
return jsonify({'error': f'未找到标题为 "{media_title}" 的媒体文件'}), 404
media_dict = dict(media_row)
media_type = media_dict['media_type']
file_path = media_dict['file_path']
# 提取文件名(不含路径)
media_filename = os.path.basename(file_path)
try:
# 1. 推送播报内容到数字人5000端口
avatar_response = requests.post(
'http://127.0.0.1:5000/transparent-pass',
json={
'user': 'User',
'text': avatar_text
},
timeout=5
)
if not avatar_response.ok:
print(f'推送数字人播报失败: {avatar_response.status_code}')
# 2. 通知老人端弹出媒体展示窗口
# 创建媒体展示事件(使用 family_alerts 表的特殊类型)
cursor.execute('''
INSERT INTO family_alerts (
family_id, alert_type, level, title, message, metadata, source
) VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
data.get('family_id', 'family_001'),
'media_display', # 特殊类型:媒体展示
'low',
media_title, # 使用媒体标题作为标题
avatar_text,
json.dumps({
'media_filename': media_filename,
'media_type': media_type,
'media_title': media_title,
'avatar_text': avatar_text,
'duration': duration,
'event_type': 'show_media'
}),
'system'
))
event_id = cursor.lastrowid
conn.commit()
conn.close()
return jsonify({
'success': True,
'event_id': event_id,
'message': '媒体展示请求已发送'
}), 201
except Exception as e:
print(f'处理媒体展示请求失败: {e}')
return jsonify({'error': str(e)}), 500
@app.route('/api/elderly/hide-media', methods=['POST'])
def hide_media_on_avatar():
"""
控制老人端关闭当前显示的媒体窗口
参数:
- family_id: 家庭ID可选默认family_001
"""
data = request.json or {}
family_id = data.get('family_id', 'family_001')
conn = get_db()
cursor = conn.cursor()
try:
# 创建隐藏媒体事件
cursor.execute('''
INSERT INTO family_alerts (
family_id, alert_type, level, title, message, metadata, source
) VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
family_id,
'media_display', # 使用相同类型
'low',
'关闭媒体显示',
'关闭当前显示的媒体',
json.dumps({
'event_type': 'hide_media'
}),
'system'
))
event_id = cursor.lastrowid
conn.commit()
conn.close()
return jsonify({
'success': True,
'event_id': event_id,
'message': '关闭媒体请求已发送'
}), 201
except Exception as e:
print(f'处理关闭媒体请求失败: {e}')
conn.close()
return jsonify({'error': str(e)}), 500
@app.route('/api/elderly/poll-media-events', methods=['GET'])
def poll_media_events():
"""
老人端轮询媒体展示事件
"""
family_id = request.args.get('family_id', 'family_001')
conn = get_db()
cursor = conn.cursor()
# 查询未读的媒体展示事件
cursor.execute('''
SELECT * FROM family_alerts
WHERE family_id = ?
AND alert_type = 'media_display'
AND read = 0
AND is_active = 1
ORDER BY created_at DESC
LIMIT 1
''', (family_id,))
row = cursor.fetchone()
if row:
alert = dict(row)
# 解析元数据
if alert['metadata']:
try:
alert['metadata'] = json.loads(alert['metadata'])
except:
alert['metadata'] = {}
# 标记为已读
cursor.execute('''
UPDATE family_alerts
SET read = 1, read_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (alert['id'],))
conn.commit()
conn.close()
return jsonify({'event': alert})
else:
conn.close()
return jsonify({'event': None})
# ==================== 老人端Toast通知 ====================
# 全局变量存储待显示的toast内存中重启会丢失
pending_toasts = {} # key: family_id, value: list of toast objects
# SSE连接管理
sse_clients = {} # key: family_id, value: list of response queues
@app.route('/api/elderly/toast', methods=['POST'])
def create_toast():
"""创建老人端Toast通知供MCP工具调用"""
data = request.json
family_id = data.get('family_id')
toast_type = data.get('type', 'info') # success, info, calling
message = data.get('message')
duration = data.get('duration', 3000) # 默认3秒
if not family_id or not message:
return jsonify({'error': '缺少必需参数'}), 400
# 创建toast对象
toast = {
'id': int(time.time() * 1000), # 使用时间戳作为ID
'type': toast_type,
'message': message,
'duration': duration,
'created_at': datetime.now().isoformat()
}
# 添加到待显示列表(备用轮询方式)
if family_id not in pending_toasts:
pending_toasts[family_id] = []
pending_toasts[family_id].append(toast)
# 通过SSE推送给连接的客户端
if family_id in sse_clients:
for client_queue in sse_clients[family_id]:
try:
client_queue.put(toast)
except:
pass # 客户端可能已断开
return jsonify({'success': True, 'toast_id': toast['id']}), 201
@app.route('/api/elderly/toast/poll', methods=['GET'])
def poll_toast():
"""老人端轮询获取待显示的Toast备用方案"""
family_id = request.args.get('family_id')
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
# 获取并清空该family的待显示toast
toasts = pending_toasts.get(family_id, [])
if toasts:
# 返回最新的toast并从列表中移除
toast = toasts.pop(0)
return jsonify({'toast': toast})
return jsonify({'toast': None})
@app.route('/api/elderly/toast/stream', methods=['GET'])
def toast_stream():
"""SSE端点实时推送Toast通知"""
family_id = request.args.get('family_id')
if not family_id:
return jsonify({'error': '缺少family_id参数'}), 400
def generate():
import queue
# 为此客户端创建队列
client_queue = queue.Queue()
# 注册客户端
if family_id not in sse_clients:
sse_clients[family_id] = []
sse_clients[family_id].append(client_queue)
try:
# 发送连接成功消息
yield f"data: {json.dumps({'type': 'connected'})}\n\n"
# 持续监听队列
while True:
try:
# 等待新的toast30秒超时发送心跳
toast = client_queue.get(timeout=30)
yield f"data: {json.dumps(toast)}\n\n"
except queue.Empty:
# 发送心跳保持连接
yield f": heartbeat\n\n"
finally:
# 客户端断开时清理
if family_id in sse_clients:
sse_clients[family_id].remove(client_queue)
if not sse_clients[family_id]:
del sse_clients[family_id]
return Response(
generate(),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no',
'Connection': 'keep-alive'
}
)
# ==================== 健康检查 ====================
@app.route('/api/health', methods=['GET'])
def health_check():
"""健康检查端点"""
return jsonify({'status': 'ok', 'timestamp': datetime.now().isoformat()})
if __name__ == '__main__':
# 初始化数据库
init_db()
print("数据库初始化完成")
# 启动应用
app.run(host='0.0.0.0', port=8000, debug=True)