""" KinEcho 服务端 - Flask应用主入口 支持家人端和老人端的日程同步管理 """ from flask import Flask, request, jsonify, send_from_directory, Response from flask_cors import CORS from datetime import datetime, timedelta, timezone import sqlite3 import os import json import time import requests from werkzeug.utils import secure_filename app = Flask(__name__) CORS(app) # 允许跨域请求 # 北京时区 (UTC+8) BEIJING_TZ = timezone(timedelta(hours=8)) def get_beijing_time(): """获取当前北京时间""" return datetime.now(BEIJING_TZ) def utc_to_beijing(utc_str): """将UTC时间字符串转换为北京时间字符串""" if not utc_str: return None try: # 解析UTC时间(SQLite的CURRENT_TIMESTAMP格式) utc_dt = datetime.strptime(utc_str, '%Y-%m-%d %H:%M:%S') # 添加UTC时区信息 utc_dt = utc_dt.replace(tzinfo=timezone.utc) # 转换为北京时间 beijing_dt = utc_dt.astimezone(BEIJING_TZ) # 返回不带时区信息的字符串 return beijing_dt.strftime('%Y-%m-%d %H:%M:%S') except: return utc_str # 数据库配置 DB_PATH = os.path.join(os.path.dirname(__file__), 'kinecho.db') # 文件上传配置 UPLOAD_FOLDER = os.path.join(os.path.dirname(__file__), 'uploads') ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'mp4', 'mov', 'avi'} MAX_FILE_SIZE = 100 * 1024 * 1024 # 100MB # 确保上传目录存在 os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(os.path.join(UPLOAD_FOLDER, 'thumbnails'), exist_ok=True) app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER app.config['MAX_CONTENT_LENGTH'] = MAX_FILE_SIZE def allowed_file(filename): """检查文件扩展名是否允许""" return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def generate_video_thumbnail(video_path, filename): """使用ffmpeg生成视频缩略图""" import subprocess thumbnail_filename = filename.rsplit('.', 1)[0] + '_thumb.jpg' thumbnail_path = os.path.join(UPLOAD_FOLDER, 'thumbnails', thumbnail_filename) try: # 使用ffmpeg截取第1秒的帧作为缩略图 cmd = [ 'ffmpeg', '-i', video_path, '-ss', '00:00:01', '-vframes', '1', '-vf', 'scale=320:-1', '-y', thumbnail_path ] subprocess.run(cmd, capture_output=True, check=True, timeout=30) return thumbnail_path except Exception as e: print(f'生成视频缩略图失败: {e}') return None def generate_photo_thumbnail(photo_path, filename): """生成图片缩略图""" from PIL import Image thumbnail_filename = filename.rsplit('.', 1)[0] + '_thumb.jpg' thumbnail_path = os.path.join(UPLOAD_FOLDER, 'thumbnails', thumbnail_filename) try: with Image.open(photo_path) as img: # 转换为RGB(处理PNG等格式) if img.mode in ('RGBA', 'P'): img = img.convert('RGB') # 生成缩略图,保持比例 img.thumbnail((320, 320)) img.save(thumbnail_path, 'JPEG', quality=85) return thumbnail_path except Exception as e: print(f'生成图片缩略图失败: {e}') return None def get_db(): """获取数据库连接""" conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row # 返回字典格式 return conn def init_db(): """初始化数据库""" conn = get_db() cursor = conn.cursor() # 用户表 cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_type TEXT NOT NULL, -- 'family' 或 'elderly' name TEXT NOT NULL, phone TEXT, family_id TEXT, -- 家庭组ID,关联家人和老人 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # 日程/护理计划表 cursor.execute(''' CREATE TABLE IF NOT EXISTS schedules ( id INTEGER PRIMARY KEY AUTOINCREMENT, family_id TEXT NOT NULL, -- 家庭组ID title TEXT NOT NULL, -- 日程标题 description TEXT, -- 详细描述 schedule_type TEXT, -- 类型:medication(用药)、exercise(运动)、meal(饮食)、checkup(检查)等 schedule_time TIMESTAMP NOT NULL, -- 日程时间 repeat_type TEXT DEFAULT 'once', -- 重复类型:once, daily, weekly, monthly repeat_days TEXT, -- 重复的星期几,JSON格式:[1,3,5] status TEXT DEFAULT 'pending', -- 状态:pending(待执行), completed(已完成), skipped(已放弃), missed(已错过) completed_at TIMESTAMP, -- 完成时间 auto_remind INTEGER DEFAULT 1, -- 数字人自动播报:1=启用,0=禁用 is_active INTEGER DEFAULT 1, -- 是否启用 created_by INTEGER, -- 创建者用户ID created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (created_by) REFERENCES users(id) ) ''') # 提醒记录表 cursor.execute(''' CREATE TABLE IF NOT EXISTS reminders ( id INTEGER PRIMARY KEY AUTOINCREMENT, schedule_id INTEGER NOT NULL, elderly_id INTEGER NOT NULL, -- 老人用户ID remind_time TIMESTAMP NOT NULL, -- 提醒时间 status TEXT DEFAULT 'pending', -- pending, completed, missed, dismissed completed_at TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (schedule_id) REFERENCES schedules(id), FOREIGN KEY (elderly_id) REFERENCES users(id) ) ''') # 媒体文件表 cursor.execute(''' CREATE TABLE IF NOT EXISTS media ( id INTEGER PRIMARY KEY AUTOINCREMENT, family_id TEXT NOT NULL, -- 家庭组ID media_type TEXT NOT NULL, -- 'photo' 或 'video' title TEXT NOT NULL, -- 媒体标题 description TEXT, -- 描述 file_path TEXT NOT NULL, -- 文件存储路径 file_size INTEGER, -- 文件大小(字节) duration INTEGER, -- 视频时长(秒),仅视频有值 thumbnail_path TEXT, -- 缩略图路径 uploaded_by INTEGER, -- 上传者用户ID is_active INTEGER DEFAULT 1, -- 是否启用 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (uploaded_by) REFERENCES users(id) ) ''') # 媒体标签表 cursor.execute(''' CREATE TABLE IF NOT EXISTS media_tags ( id INTEGER PRIMARY KEY AUTOINCREMENT, media_id INTEGER NOT NULL, tag TEXT NOT NULL, -- 标签内容,如 '孙女小米', '生日', '旅行' 等 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (media_id) REFERENCES media(id) ON DELETE CASCADE ) ''') # 媒体触发策略表 cursor.execute(''' CREATE TABLE IF NOT EXISTS media_policies ( id INTEGER PRIMARY KEY AUTOINCREMENT, media_id INTEGER NOT NULL, time_windows TEXT, -- 播放时段,JSON格式:["07:00-09:00", "19:00-21:00"] moods TEXT, -- 适合心境,JSON格式:["happy", "sad", "calm"] occasions TEXT, -- 特殊场合,JSON格式:["birthday", "anniversary"] cooldown INTEGER DEFAULT 60, -- 冷却时间(分钟),避免重复播放 priority INTEGER DEFAULT 5, -- 优先级 1-10,数字越大优先级越高 last_played_at TIMESTAMP, -- 上次播放时间 play_count INTEGER DEFAULT 0, -- 播放次数 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (media_id) REFERENCES media(id) ON DELETE CASCADE ) ''') # 媒体播放历史表 cursor.execute(''' CREATE TABLE IF NOT EXISTS media_play_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, media_id INTEGER NOT NULL, elderly_id INTEGER NOT NULL, -- 老人用户ID played_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- 播放时间 duration_watched INTEGER, -- 观看时长(秒) completed INTEGER DEFAULT 0, -- 是否看完:1=是,0=否 triggered_by TEXT, -- 触发方式:'auto'=自动, 'manual'=手动, 'mood'=情绪触发等 mood_before TEXT, -- 播放前情绪状态 mood_after TEXT, -- 播放后情绪状态 FOREIGN KEY (media_id) REFERENCES media(id), FOREIGN KEY (elderly_id) REFERENCES users(id) ) ''') # 媒体反馈表(点赞/点踩) cursor.execute(''' CREATE TABLE IF NOT EXISTS media_feedback ( id INTEGER PRIMARY KEY AUTOINCREMENT, media_id INTEGER NOT NULL, elderly_id INTEGER NOT NULL, -- 老人用户ID feedback_type TEXT NOT NULL, -- 'like' 或 'dislike' created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (media_id) REFERENCES media(id), FOREIGN KEY (elderly_id) REFERENCES users(id), UNIQUE(media_id, elderly_id) -- 每个老人对每个媒体只能有一个反馈 ) ''') # 家属留言表 cursor.execute(''' CREATE TABLE IF NOT EXISTS family_messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, family_id TEXT NOT NULL, -- 家庭组ID content TEXT NOT NULL, -- 留言内容 sender_name TEXT NOT NULL, -- 发送者姓名 sender_relation TEXT NOT NULL, -- 发送者称呼(儿子、女儿、孙女等) scheduled_time TIMESTAMP NOT NULL, -- 预约播报时间 played INTEGER DEFAULT 0, -- 是否已播放:0=未播放,1=已播放 played_at TIMESTAMP, -- 实际播报时间 liked INTEGER DEFAULT 0, -- 老人是否点赞:0=未点赞,1=已点赞 is_active INTEGER DEFAULT 1, -- 是否有效 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # 家属端消息/告警表(优化版) cursor.execute(''' CREATE TABLE IF NOT EXISTS family_alerts ( id INTEGER PRIMARY KEY AUTOINCREMENT, family_id TEXT NOT NULL, -- 家庭组ID elderly_id INTEGER, -- 老人用户ID(可选,用于关联具体老人) alert_type TEXT NOT NULL, -- 消息类型:sos_emergency, contact_family, medication, emotion, inactive, emergency level TEXT NOT NULL, -- 级别:low, medium, high title TEXT, -- 消息标题(简短概要) message TEXT NOT NULL, -- 消息详细内容 metadata TEXT, -- 额外元数据,JSON格式,如 {"location": "客厅", "device": "平板"} source TEXT DEFAULT 'elderly', -- 消息来源:elderly(老人端), system(系统自动), family(家属端) handled INTEGER DEFAULT 0, -- 是否已处理:0=未处理,1=已处理 handled_at TIMESTAMP, -- 处理时间 handled_by INTEGER, -- 处理人用户ID reply_message TEXT, -- 家属回复内容 read INTEGER DEFAULT 0, -- 是否已读:0=未读,1=已读 read_at TIMESTAMP, -- 阅读时间 is_active INTEGER DEFAULT 1, -- 是否有效 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (elderly_id) REFERENCES users(id), FOREIGN KEY (handled_by) REFERENCES users(id) ) ''') # 情绪记录表 cursor.execute(''' CREATE TABLE IF NOT EXISTS mood_records ( id INTEGER PRIMARY KEY AUTOINCREMENT, family_id TEXT NOT NULL, -- 家庭组ID elderly_id INTEGER, -- 老人用户ID mood_type TEXT NOT NULL, -- 情绪类型:happy(开心), calm(平静), sad(难过), anxious(焦虑), angry(生气), tired(疲惫) mood_score INTEGER DEFAULT 5, -- 情绪分数 1-10,数字越大越积极 note TEXT, -- 备注说明 source TEXT DEFAULT 'manual', -- 来源:manual(手动记录), ai_detect(AI检测), voice(语音分析) trigger_event TEXT, -- 触发事件,如 '看了家人照片', '完成了散步' location TEXT, -- 记录地点 weather TEXT, -- 天气情况 recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- 记录时间 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (elderly_id) REFERENCES users(id) ) ''') # 情绪记录索引 cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_mood_records_family_id ON mood_records(family_id) ''') cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_mood_records_elderly_id ON mood_records(elderly_id) ''') cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_mood_records_recorded_at ON mood_records(recorded_at DESC) ''') # 创建索引以提高查询性能 cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_family_alerts_family_id ON family_alerts(family_id) ''') cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_family_alerts_created_at ON family_alerts(created_at DESC) ''') cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_family_alerts_handled ON family_alerts(handled, created_at DESC) ''') conn.commit() conn.close() # ==================== 家人端 API ==================== @app.route('/api/family/schedules', methods=['GET']) def get_family_schedules(): """获取家庭所有日程""" family_id = request.args.get('family_id') if not family_id: return jsonify({'error': '缺少family_id参数'}), 400 conn = get_db() cursor = conn.cursor() cursor.execute(''' SELECT s.*, u.name as creator_name FROM schedules s LEFT JOIN users u ON s.created_by = u.id WHERE s.family_id = ? AND s.is_active = 1 ORDER BY s.schedule_time DESC ''', (family_id,)) schedules = [dict(row) for row in cursor.fetchall()] conn.close() return jsonify({'schedules': schedules}) @app.route('/api/family/schedules', methods=['POST']) def create_schedule(): """创建新日程""" data = request.json required_fields = ['family_id', 'title', 'schedule_time'] if not all(field in data for field in required_fields): return jsonify({'error': '缺少必需字段'}), 400 conn = get_db() cursor = conn.cursor() cursor.execute(''' INSERT INTO schedules ( family_id, title, description, schedule_type, schedule_time, repeat_type, repeat_days, auto_remind, created_by ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( data['family_id'], data['title'], data.get('description', ''), data.get('schedule_type', 'other'), data['schedule_time'], data.get('repeat_type', 'once'), data.get('repeat_days', ''), data.get('auto_remind', 1), data.get('created_by') )) schedule_id = cursor.lastrowid conn.commit() conn.close() return jsonify({'success': True, 'schedule_id': schedule_id}), 201 @app.route('/api/family/schedules/', methods=['PUT']) def update_schedule(schedule_id): """更新日程""" data = request.json conn = get_db() cursor = conn.cursor() # 构建更新语句 update_fields = [] params = [] for field in ['title', 'description', 'schedule_type', 'schedule_time', 'repeat_type', 'repeat_days', 'auto_remind', 'status']: if field in data: update_fields.append(f"{field} = ?") params.append(data[field]) if not update_fields: return jsonify({'error': '没有要更新的字段'}), 400 update_fields.append("updated_at = CURRENT_TIMESTAMP") params.append(schedule_id) cursor.execute(f''' UPDATE schedules SET {', '.join(update_fields)} WHERE id = ? ''', params) conn.commit() conn.close() return jsonify({'success': True}) @app.route('/api/family/schedules/', methods=['DELETE']) def delete_schedule(schedule_id): """删除日程(软删除)""" conn = get_db() cursor = conn.cursor() cursor.execute(''' UPDATE schedules SET is_active = 0, updated_at = CURRENT_TIMESTAMP WHERE id = ? ''', (schedule_id,)) conn.commit() conn.close() return jsonify({'success': True}) # ==================== 家属端消息/告警 API ==================== @app.route('/api/family/alerts', methods=['GET']) def get_family_alerts(): """获取家庭所有消息/告警""" family_id = request.args.get('family_id') status = request.args.get('status') # all, unhandled, handled handled = request.args.get('handled') # true/false 布尔值 read = request.args.get('read') # true/false 布尔值 alert_type = request.args.get('alert_type') # 消息类型 elderly_id = request.args.get('elderly_id', type=int) # 老人ID level = request.args.get('level') # low, medium, high limit = request.args.get('limit', 100, type=int) offset = request.args.get('offset', 0, type=int) if not family_id: return jsonify({'error': '缺少family_id参数'}), 400 conn = get_db() cursor = conn.cursor() # 构建查询条件 conditions = ['a.family_id = ?', 'a.is_active = 1'] params = [family_id] # 排除媒体展示事件(这些只用于老人端轮询,不应出现在家属端通知列表) conditions.append("a.alert_type != 'media_display'") # 支持status参数(兼容旧版) if status == 'unhandled': conditions.append('a.handled = 0') elif status == 'handled': conditions.append('a.handled = 1') # 支持handled参数(布尔值) if handled is not None: if handled.lower() == 'true': conditions.append('a.handled = 1') elif handled.lower() == 'false': conditions.append('a.handled = 0') # 支持read参数(布尔值) if read is not None: if read.lower() == 'true': conditions.append('a.read = 1') elif read.lower() == 'false': conditions.append('a.read = 0') # 支持alert_type参数 if alert_type: conditions.append('a.alert_type = ?') params.append(alert_type) # 支持elderly_id参数 if elderly_id: conditions.append('a.elderly_id = ?') params.append(elderly_id) if level: conditions.append('a.level = ?') params.append(level) where_clause = ' AND '.join(conditions) # 查询总数 cursor.execute(f''' SELECT COUNT(*) as total FROM family_alerts a WHERE {where_clause} ''', params) total = cursor.fetchone()['total'] # 查询数据(包含老人信息) cursor.execute(f''' SELECT a.*, u.name as elderly_name, h.name as handler_name FROM family_alerts a LEFT JOIN users u ON a.elderly_id = u.id LEFT JOIN users h ON a.handled_by = h.id WHERE {where_clause} ORDER BY a.created_at DESC LIMIT ? OFFSET ? ''', params + [limit, offset]) alerts = [] for row in cursor.fetchall(): alert = dict(row) # 转换布尔值 alert['handled'] = bool(alert['handled']) alert['read'] = bool(alert['read']) # 解析元数据JSON if alert['metadata']: try: alert['metadata'] = json.loads(alert['metadata']) except: alert['metadata'] = {} alerts.append(alert) conn.close() return jsonify({ 'alerts': alerts, 'total': total, 'limit': limit, 'offset': offset }) @app.route('/api/family/alerts', methods=['POST']) def create_alert(): """创建新消息/告警(由老人端或系统触发)""" data = request.json required_fields = ['family_id', 'alert_type', 'level', 'message'] if not all(field in data for field in required_fields): return jsonify({'error': '缺少必需字段'}), 400 conn = get_db() cursor = conn.cursor() # 处理元数据 metadata = data.get('metadata', {}) metadata_json = json.dumps(metadata) if metadata else None cursor.execute(''' INSERT INTO family_alerts ( family_id, elderly_id, alert_type, level, title, message, metadata, source ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ''', ( data['family_id'], data.get('elderly_id'), data['alert_type'], data['level'], data.get('title'), data['message'], metadata_json, data.get('source', 'elderly') )) alert_id = cursor.lastrowid conn.commit() conn.close() return jsonify({'success': True, 'alert_id': alert_id}), 201 @app.route('/api/family/alerts//handle', methods=['POST']) def handle_alert(alert_id): """标记消息/告警为已处理""" data = request.json or {} conn = get_db() cursor = conn.cursor() cursor.execute(''' UPDATE family_alerts SET handled = 1, handled_at = CURRENT_TIMESTAMP, handled_by = ?, reply_message = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? ''', ( data.get('handled_by'), data.get('reply_message'), alert_id )) conn.commit() conn.close() return jsonify({'success': True}) @app.route('/api/family/alerts//read', methods=['POST']) def mark_alert_read(alert_id): """标记消息为已读""" conn = get_db() cursor = conn.cursor() cursor.execute(''' UPDATE family_alerts SET read = 1, read_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE id = ? ''', (alert_id,)) conn.commit() conn.close() return jsonify({'success': True}) @app.route('/api/family/alerts//reply', methods=['POST']) def reply_alert(alert_id): """家属回复消息""" data = request.json if not data or 'reply_message' not in data: return jsonify({'error': '缺少reply_message字段'}), 400 conn = get_db() cursor = conn.cursor() cursor.execute(''' UPDATE family_alerts SET reply_message = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? ''', (data['reply_message'], alert_id)) conn.commit() conn.close() return jsonify({'success': True}) @app.route('/api/family/alerts/', methods=['DELETE']) def delete_alert(alert_id): """删除消息/告警(软删除)""" conn = get_db() cursor = conn.cursor() cursor.execute(''' UPDATE family_alerts SET is_active = 0, updated_at = CURRENT_TIMESTAMP WHERE id = ? ''', (alert_id,)) conn.commit() conn.close() return jsonify({'success': True}) @app.route('/api/family/alerts/stats', methods=['GET']) def get_alerts_stats(): """获取消息统计数据""" family_id = request.args.get('family_id') if not family_id: return jsonify({'error': '缺少family_id参数'}), 400 conn = get_db() cursor = conn.cursor() # 统计各级别消息数量(排除媒体展示事件) cursor.execute(''' SELECT level, COUNT(*) as count FROM family_alerts WHERE family_id = ? AND is_active = 1 AND alert_type != 'media_display' GROUP BY level ''', (family_id,)) level_stats = {row['level']: row['count'] for row in cursor.fetchall()} # 统计各类型消息数量(排除媒体展示事件) cursor.execute(''' SELECT alert_type, COUNT(*) as count FROM family_alerts WHERE family_id = ? AND is_active = 1 AND alert_type != 'media_display' GROUP BY alert_type ''', (family_id,)) type_stats = {row['alert_type']: row['count'] for row in cursor.fetchall()} # 统计已处理/未处理(排除媒体展示事件) cursor.execute(''' SELECT COUNT(CASE WHEN handled = 0 THEN 1 END) as unhandled, COUNT(CASE WHEN handled = 1 THEN 1 END) as handled, COUNT(CASE WHEN read = 0 THEN 1 END) as unread FROM family_alerts WHERE family_id = ? AND is_active = 1 AND alert_type != 'media_display' ''', (family_id,)) status_stats = dict(cursor.fetchone()) # 今日新增消息数(排除媒体展示事件) cursor.execute(''' SELECT COUNT(*) as today_count FROM family_alerts WHERE family_id = ? AND is_active = 1 AND alert_type != 'media_display' AND DATE(created_at) = DATE('now') ''', (family_id,)) today_count = cursor.fetchone()['today_count'] conn.close() return jsonify({ 'level_stats': level_stats, 'type_stats': type_stats, 'status_stats': status_stats, 'today_count': today_count }) # ==================== 家属留言 API ==================== @app.route('/api/family/messages', methods=['GET']) def get_family_messages(): """获取家庭所有留言""" family_id = request.args.get('family_id') if not family_id: return jsonify({'error': '缺少family_id参数'}), 400 conn = get_db() cursor = conn.cursor() cursor.execute(''' SELECT * FROM family_messages WHERE family_id = ? AND is_active = 1 ORDER BY created_at DESC ''', (family_id,)) messages = [] for row in cursor.fetchall(): msg = dict(row) # 转换布尔值 msg['played'] = bool(msg['played']) msg['liked'] = bool(msg['liked']) # 转换UTC时间为北京时间 msg['created_at'] = utc_to_beijing(msg['created_at']) msg['updated_at'] = utc_to_beijing(msg['updated_at']) if msg.get('played_at'): msg['played_at'] = utc_to_beijing(msg['played_at']) messages.append(msg) conn.close() return jsonify({'messages': messages}) @app.route('/api/family/messages', methods=['POST']) def create_message(): """创建新留言""" data = request.json required_fields = ['family_id', 'content', 'sender_name', 'sender_relation', 'scheduled_time'] if not all(field in data for field in required_fields): return jsonify({'error': '缺少必需字段'}), 400 conn = get_db() cursor = conn.cursor() cursor.execute(''' INSERT INTO family_messages ( family_id, content, sender_name, sender_relation, scheduled_time ) VALUES (?, ?, ?, ?, ?) ''', ( data['family_id'], data['content'], data['sender_name'], data['sender_relation'], data['scheduled_time'] )) message_id = cursor.lastrowid conn.commit() conn.close() return jsonify({'success': True, 'message_id': message_id}), 201 @app.route('/api/family/messages/', methods=['DELETE']) def delete_message(message_id): """删除留言(软删除)""" conn = get_db() cursor = conn.cursor() cursor.execute(''' UPDATE family_messages SET is_active = 0, updated_at = CURRENT_TIMESTAMP WHERE id = ? ''', (message_id,)) conn.commit() conn.close() return jsonify({'success': True}) # ==================== 老人端留言 API ==================== @app.route('/api/elderly/messages', methods=['GET']) def get_elderly_messages(): """获取老人端的留言列表(按预约时间排序)""" family_id = request.args.get('family_id') if not family_id: return jsonify({'error': '缺少family_id参数'}), 400 conn = get_db() cursor = conn.cursor() cursor.execute(''' SELECT * FROM family_messages WHERE family_id = ? AND is_active = 1 ORDER BY scheduled_time ASC ''', (family_id,)) messages = [] for row in cursor.fetchall(): msg = dict(row) msg['played'] = bool(msg['played']) msg['liked'] = bool(msg['liked']) # 转换UTC时间为北京时间 msg['created_at'] = utc_to_beijing(msg['created_at']) msg['updated_at'] = utc_to_beijing(msg['updated_at']) if msg.get('played_at'): msg['played_at'] = utc_to_beijing(msg['played_at']) messages.append(msg) conn.close() return jsonify({'messages': messages}) @app.route('/api/elderly/messages/pending', methods=['GET']) def get_pending_messages(): """获取待播放的留言(预约时间已到但未播放的)""" family_id = request.args.get('family_id') if not family_id: return jsonify({'error': '缺少family_id参数'}), 400 conn = get_db() cursor = conn.cursor() # 调试日志 - 使用北京时间 beijing_now = get_beijing_time() current_time = beijing_now.strftime('%Y-%m-%d %H:%M:%S') print(f"[DEBUG] 当前北京时间: {current_time}") cursor.execute(''' SELECT * FROM family_messages WHERE family_id = ? AND is_active = 1 AND played = 0 ORDER BY scheduled_time ASC ''', (family_id,)) # 获取所有未播放的消息 all_messages = [] for row in cursor.fetchall(): msg = dict(row) print(f"[DEBUG] 留言 ID: {msg['id']}, 预约时间: {msg['scheduled_time']}") all_messages.append(msg) # 在 Python 中进行时间比较(更可靠) - 使用北京时间 now = beijing_now.replace(tzinfo=None) # 移除时区信息以便比较 messages = [] for msg in all_messages: try: # 处理各种可能的时间格式 scheduled_str = msg['scheduled_time'] # 移除 'T',统一为空格分隔 scheduled_str = scheduled_str.replace('T', ' ') # 如果没有秒数,添加 :00 if len(scheduled_str) == 16: # YYYY-MM-DD HH:MM scheduled_str += ':00' scheduled_time = datetime.strptime(scheduled_str, '%Y-%m-%d %H:%M:%S') print(f"[DEBUG] 解析后时间: {scheduled_time}, 当前北京时间: {now}, 已到期: {scheduled_time <= now}") if scheduled_time <= now: msg['played'] = bool(msg['played']) msg['liked'] = bool(msg['liked']) # 转换UTC时间为北京时间 msg['created_at'] = utc_to_beijing(msg['created_at']) msg['updated_at'] = utc_to_beijing(msg['updated_at']) if msg.get('played_at'): msg['played_at'] = utc_to_beijing(msg['played_at']) messages.append(msg) except Exception as e: print(f"[DEBUG] 时间解析错误: {e}, 原始值: {msg['scheduled_time']}") continue conn.close() print(f"[DEBUG] 找到 {len(messages)} 条待播放留言") return jsonify({'messages': messages}) @app.route('/api/elderly/messages//play', methods=['POST']) def play_message(message_id): """标记留言为已播放""" conn = get_db() cursor = conn.cursor() cursor.execute(''' UPDATE family_messages SET played = 1, played_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE id = ? ''', (message_id,)) conn.commit() conn.close() return jsonify({'success': True}) @app.route('/api/elderly/messages//like', methods=['POST']) def like_message(message_id): """老人点赞留言""" conn = get_db() cursor = conn.cursor() cursor.execute(''' UPDATE family_messages SET liked = 1, updated_at = CURRENT_TIMESTAMP WHERE id = ? ''', (message_id,)) conn.commit() conn.close() return jsonify({'success': True}) @app.route('/api/elderly/messages//unlike', methods=['POST']) def unlike_message(message_id): """老人取消点赞""" conn = get_db() cursor = conn.cursor() cursor.execute(''' UPDATE family_messages SET liked = 0, updated_at = CURRENT_TIMESTAMP WHERE id = ? ''', (message_id,)) conn.commit() conn.close() return jsonify({'success': True}) # ==================== 老人端消息/告警 API ==================== @app.route('/api/elderly/alerts', methods=['POST']) def create_elderly_alert(): """老人端创建消息(如SOS、联系家人)""" data = request.json required_fields = ['family_id', 'alert_type', 'level', 'message'] if not all(field in data for field in required_fields): return jsonify({'error': '缺少必需字段'}), 400 conn = get_db() cursor = conn.cursor() # 处理元数据 metadata = data.get('metadata', {}) metadata_json = json.dumps(metadata) if metadata else None cursor.execute(''' INSERT INTO family_alerts ( family_id, elderly_id, alert_type, level, title, message, metadata, source ) VALUES (?, ?, ?, ?, ?, ?, ?, 'elderly') ''', ( data['family_id'], data.get('elderly_id'), data['alert_type'], data['level'], data.get('title'), data['message'], metadata_json )) alert_id = cursor.lastrowid conn.commit() conn.close() return jsonify({'success': True, 'alert_id': alert_id}), 201 @app.route('/api/elderly/alerts/replies', methods=['GET']) def get_elderly_alert_replies(): """获取家属对老人消息的回复""" family_id = request.args.get('family_id') elderly_id = request.args.get('elderly_id') if not family_id: return jsonify({'error': '缺少family_id参数'}), 400 conn = get_db() cursor = conn.cursor() conditions = ['family_id = ?', 'is_active = 1', 'reply_message IS NOT NULL'] params = [family_id] if elderly_id: conditions.append('elderly_id = ?') params.append(elderly_id) where_clause = ' AND '.join(conditions) cursor.execute(f''' SELECT id, alert_type, level, message, reply_message, handled_at, created_at FROM family_alerts WHERE {where_clause} ORDER BY handled_at DESC LIMIT 10 ''', params) replies = [dict(row) for row in cursor.fetchall()] conn.close() return jsonify({'replies': replies}) # ==================== 情绪记录 API ==================== @app.route('/api/elderly/moods', methods=['POST']) def create_mood_record(): """老人端创建情绪记录""" data = request.json required_fields = ['family_id', 'mood_type'] if not all(field in data for field in required_fields): return jsonify({'error': '缺少必需字段'}), 400 # 验证情绪类型 valid_moods = ['happy', 'calm', 'sad', 'anxious', 'angry', 'tired'] if data['mood_type'] not in valid_moods: return jsonify({'error': '无效的情绪类型'}), 400 # 验证情绪分数范围 mood_score = data.get('mood_score', 5) if not (1 <= mood_score <= 10): return jsonify({'error': '情绪分数必须在1-10之间'}), 400 conn = get_db() cursor = conn.cursor() cursor.execute(''' INSERT INTO mood_records ( family_id, elderly_id, mood_type, mood_score, note, source, trigger_event, location, weather, recorded_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( data['family_id'], data.get('elderly_id'), data['mood_type'], mood_score, data.get('note', ''), data.get('source', 'manual'), data.get('trigger_event', ''), data.get('location', ''), data.get('weather', ''), data.get('recorded_at', datetime.now().strftime('%Y-%m-%d %H:%M:%S')) )) record_id = cursor.lastrowid conn.commit() conn.close() return jsonify({'success': True, 'record_id': record_id}), 201 @app.route('/api/elderly/moods', methods=['GET']) def get_elderly_moods(): """获取老人的情绪记录列表""" family_id = request.args.get('family_id') elderly_id = request.args.get('elderly_id') limit = request.args.get('limit', 50, type=int) offset = request.args.get('offset', 0, type=int) if not family_id: return jsonify({'error': '缺少family_id参数'}), 400 conn = get_db() cursor = conn.cursor() conditions = ['family_id = ?'] params = [family_id] if elderly_id: conditions.append('elderly_id = ?') params.append(elderly_id) where_clause = ' AND '.join(conditions) # 查询总数 cursor.execute(f''' SELECT COUNT(*) as total FROM mood_records WHERE {where_clause} ''', params) total = cursor.fetchone()['total'] # 查询数据 cursor.execute(f''' SELECT * FROM mood_records WHERE {where_clause} ORDER BY recorded_at DESC LIMIT ? OFFSET ? ''', params + [limit, offset]) records = [dict(row) for row in cursor.fetchall()] conn.close() return jsonify({ 'records': records, 'total': total, 'limit': limit, 'offset': offset }) @app.route('/api/elderly/moods/today', methods=['GET']) def get_today_moods(): """获取老人今日的情绪记录""" family_id = request.args.get('family_id') elderly_id = request.args.get('elderly_id') if not family_id: return jsonify({'error': '缺少family_id参数'}), 400 conn = get_db() cursor = conn.cursor() today = datetime.now().strftime('%Y-%m-%d') conditions = ['family_id = ?', 'DATE(recorded_at) = DATE(?)'] params = [family_id, today] if elderly_id: conditions.append('elderly_id = ?') params.append(elderly_id) where_clause = ' AND '.join(conditions) cursor.execute(f''' SELECT * FROM mood_records WHERE {where_clause} ORDER BY recorded_at DESC ''', params) records = [dict(row) for row in cursor.fetchall()] conn.close() return jsonify({'records': records}) @app.route('/api/elderly/moods/latest', methods=['GET']) def get_latest_mood(): """获取老人最新的情绪记录""" family_id = request.args.get('family_id') elderly_id = request.args.get('elderly_id') if not family_id: return jsonify({'error': '缺少family_id参数'}), 400 conn = get_db() cursor = conn.cursor() conditions = ['family_id = ?'] params = [family_id] if elderly_id: conditions.append('elderly_id = ?') params.append(elderly_id) where_clause = ' AND '.join(conditions) cursor.execute(f''' SELECT * FROM mood_records WHERE {where_clause} ORDER BY recorded_at DESC LIMIT 1 ''', params) row = cursor.fetchone() conn.close() if row: return jsonify({'record': dict(row)}) else: return jsonify({'record': None}) # ==================== 家属端情绪记录 API ==================== @app.route('/api/family/moods', methods=['GET']) def get_family_moods(): """家属端获取老人的情绪记录""" family_id = request.args.get('family_id') elderly_id = request.args.get('elderly_id') mood_type = request.args.get('mood_type') start_date = request.args.get('start_date') end_date = request.args.get('end_date') limit = request.args.get('limit', 100, type=int) offset = request.args.get('offset', 0, type=int) if not family_id: return jsonify({'error': '缺少family_id参数'}), 400 conn = get_db() cursor = conn.cursor() conditions = ['m.family_id = ?'] params = [family_id] if elderly_id: conditions.append('m.elderly_id = ?') params.append(elderly_id) if mood_type: conditions.append('m.mood_type = ?') params.append(mood_type) if start_date: conditions.append('DATE(m.recorded_at) >= DATE(?)') params.append(start_date) if end_date: conditions.append('DATE(m.recorded_at) <= DATE(?)') params.append(end_date) where_clause = ' AND '.join(conditions) # 查询总数 cursor.execute(f''' SELECT COUNT(*) as total FROM mood_records m WHERE {where_clause} ''', params) total = cursor.fetchone()['total'] # 查询数据(包含老人信息) cursor.execute(f''' SELECT m.*, u.name as elderly_name FROM mood_records m LEFT JOIN users u ON m.elderly_id = u.id WHERE {where_clause} ORDER BY m.recorded_at DESC LIMIT ? OFFSET ? ''', params + [limit, offset]) records = [dict(row) for row in cursor.fetchall()] conn.close() return jsonify({ 'records': records, 'total': total, 'limit': limit, 'offset': offset }) @app.route('/api/family/moods/stats', methods=['GET']) def get_mood_stats(): """获取情绪统计数据""" family_id = request.args.get('family_id') elderly_id = request.args.get('elderly_id') days = request.args.get('days', 7, type=int) # 统计最近N天 if not family_id: return jsonify({'error': '缺少family_id参数'}), 400 conn = get_db() cursor = conn.cursor() conditions = ['family_id = ?'] params = [family_id] if elderly_id: conditions.append('elderly_id = ?') params.append(elderly_id) # 添加时间范围条件 conditions.append(f"DATE(recorded_at) >= DATE('now', '-{days} days')") where_clause = ' AND '.join(conditions) # 按情绪类型统计 cursor.execute(f''' SELECT mood_type, COUNT(*) as count, AVG(mood_score) as avg_score FROM mood_records WHERE {where_clause} GROUP BY mood_type ORDER BY count DESC ''', params) mood_type_stats = [] for row in cursor.fetchall(): stat = dict(row) stat['avg_score'] = round(stat['avg_score'], 1) if stat['avg_score'] else 0 mood_type_stats.append(stat) # 按日期统计平均分数 cursor.execute(f''' SELECT DATE(recorded_at) as date, AVG(mood_score) as avg_score, COUNT(*) as count FROM mood_records WHERE {where_clause} GROUP BY DATE(recorded_at) ORDER BY date DESC ''', params) daily_stats = [] for row in cursor.fetchall(): stat = dict(row) stat['avg_score'] = round(stat['avg_score'], 1) if stat['avg_score'] else 0 daily_stats.append(stat) # 计算整体统计 cursor.execute(f''' SELECT COUNT(*) as total_records, AVG(mood_score) as avg_score, MAX(mood_score) as max_score, MIN(mood_score) as min_score FROM mood_records WHERE {where_clause} ''', params) overall = dict(cursor.fetchone()) overall['avg_score'] = round(overall['avg_score'], 1) if overall['avg_score'] else 0 # 今日记录数 cursor.execute(f''' SELECT COUNT(*) as today_count FROM mood_records WHERE {where_clause.replace(f"DATE(recorded_at) >= DATE('now', '-{days} days')", "DATE(recorded_at) = DATE('now')")} ''', params) today_count = cursor.fetchone()['today_count'] conn.close() return jsonify({ 'mood_type_stats': mood_type_stats, 'daily_stats': daily_stats, 'overall': overall, 'today_count': today_count, 'days': days }) @app.route('/api/family/moods/trend', methods=['GET']) def get_mood_trend(): """获取情绪趋势数据""" family_id = request.args.get('family_id') elderly_id = request.args.get('elderly_id') days = request.args.get('days', 30, type=int) if not family_id: return jsonify({'error': '缺少family_id参数'}), 400 conn = get_db() cursor = conn.cursor() conditions = ['family_id = ?'] params = [family_id] if elderly_id: conditions.append('elderly_id = ?') params.append(elderly_id) conditions.append(f"DATE(recorded_at) >= DATE('now', '-{days} days')") where_clause = ' AND '.join(conditions) # 按日期获取情绪趋势 cursor.execute(f''' SELECT DATE(recorded_at) as date, mood_type, AVG(mood_score) as avg_score, COUNT(*) as count FROM mood_records WHERE {where_clause} GROUP BY DATE(recorded_at), mood_type ORDER BY date ASC, count DESC ''', params) trend_data = [] for row in cursor.fetchall(): item = dict(row) item['avg_score'] = round(item['avg_score'], 1) if item['avg_score'] else 0 trend_data.append(item) conn.close() return jsonify({ 'trend': trend_data, 'days': days }) # ==================== 老人端 API ==================== @app.route('/api/elderly/schedules/today', methods=['GET']) def get_today_schedules(): """获取老人今日日程""" family_id = request.args.get('family_id') if not family_id: return jsonify({'error': '缺少family_id参数'}), 400 conn = get_db() cursor = conn.cursor() # 获取今天的日期范围 today = datetime.now().strftime('%Y-%m-%d') cursor.execute(''' SELECT * FROM schedules WHERE family_id = ? AND is_active = 1 AND ( (repeat_type = 'once' AND DATE(schedule_time) = DATE(?)) OR repeat_type = 'daily' OR (repeat_type = 'weekly' AND CAST(strftime('%w', ?) AS INTEGER) IN ( SELECT value FROM json_each(repeat_days) )) ) ORDER BY TIME(schedule_time) ''', (family_id, today, today)) schedules = [dict(row) for row in cursor.fetchall()] conn.close() return jsonify({'schedules': schedules}) @app.route('/api/elderly/schedules/upcoming', methods=['GET']) def get_upcoming_schedules(): """获取即将到来的日程(下一小时内)""" family_id = request.args.get('family_id') elderly_id = request.args.get('elderly_id') if not family_id: return jsonify({'error': '缺少family_id参数'}), 400 conn = get_db() cursor = conn.cursor() cursor.execute(''' SELECT * FROM schedules WHERE family_id = ? AND is_active = 1 AND datetime(schedule_time) BETWEEN datetime('now') AND datetime('now', '+1 hour') ORDER BY schedule_time ''', (family_id,)) schedules = [dict(row) for row in cursor.fetchall()] conn.close() return jsonify({'schedules': schedules}) @app.route('/api/elderly/reminders//complete', methods=['POST']) def complete_reminder(reminder_id): """标记提醒为已完成""" conn = get_db() cursor = conn.cursor() cursor.execute(''' UPDATE reminders SET status = 'completed', completed_at = CURRENT_TIMESTAMP WHERE id = ? ''', (reminder_id,)) conn.commit() conn.close() return jsonify({'success': True}) @app.route('/api/elderly/reminders//dismiss', methods=['POST']) def dismiss_reminder(reminder_id): """忽略提醒""" conn = get_db() cursor = conn.cursor() cursor.execute(''' UPDATE reminders SET status = 'dismissed' WHERE id = ? ''', (reminder_id,)) conn.commit() conn.close() return jsonify({'success': True}) @app.route('/api/elderly/schedules//status', methods=['POST']) def update_schedule_status(schedule_id): """更新日程状态""" data = request.json status = data.get('status') # pending, completed, skipped, missed if status not in ['pending', 'completed', 'skipped', 'missed']: return jsonify({'error': '无效的状态值'}), 400 conn = get_db() cursor = conn.cursor() # 如果状态是 completed,记录完成时间 if status == 'completed': cursor.execute(''' UPDATE schedules SET status = ?, completed_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE id = ? ''', (status, schedule_id)) else: cursor.execute(''' UPDATE schedules SET status = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? ''', (status, schedule_id)) conn.commit() conn.close() return jsonify({'success': True}) # ==================== 用户管理 API ==================== @app.route('/api/users', methods=['POST']) def create_user(): """创建用户""" data = request.json required_fields = ['user_type', 'name', 'family_id'] if not all(field in data for field in required_fields): return jsonify({'error': '缺少必需字段'}), 400 conn = get_db() cursor = conn.cursor() cursor.execute(''' INSERT INTO users (user_type, name, phone, family_id) VALUES (?, ?, ?, ?) ''', ( data['user_type'], data['name'], data.get('phone', ''), data['family_id'] )) user_id = cursor.lastrowid conn.commit() conn.close() return jsonify({'success': True, 'user_id': user_id}), 201 @app.route('/api/users/', methods=['GET']) def get_family_users(family_id): """获取家庭成员列表""" conn = get_db() cursor = conn.cursor() cursor.execute(''' SELECT * FROM users WHERE family_id = ? ORDER BY user_type, created_at ''', (family_id,)) users = [dict(row) for row in cursor.fetchall()] conn.close() return jsonify({'users': users}) # ==================== 媒体库 API ==================== @app.route('/api/family/media', methods=['POST']) def upload_media(): """家属端上传媒体文件""" # 检查是否有文件 if 'file' not in request.files: return jsonify({'error': '没有上传文件'}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': '文件名为空'}), 400 if not allowed_file(file.filename): return jsonify({'error': '不支持的文件类型'}), 400 # 获取其他表单数据 family_id = request.form.get('family_id') title = request.form.get('title') description = request.form.get('description', '') uploaded_by = request.form.get('uploaded_by') if not family_id or not title: return jsonify({'error': '缺少必需字段'}), 400 # 保存文件 filename = secure_filename(file.filename) timestamp = datetime.now().strftime('%Y%m%d%H%M%S') unique_filename = f"{timestamp}_{filename}" file_path = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename) file.save(file_path) # 判断媒体类型 ext = filename.rsplit('.', 1)[1].lower() media_type = 'video' if ext in {'mp4', 'mov', 'avi'} else 'photo' # 获取文件大小 file_size = os.path.getsize(file_path) # 生成缩略图 thumbnail_path = None if media_type == 'video': thumbnail_path = generate_video_thumbnail(file_path, unique_filename) elif media_type == 'photo': thumbnail_path = generate_photo_thumbnail(file_path, unique_filename) # 插入数据库 conn = get_db() cursor = conn.cursor() cursor.execute(''' INSERT INTO media ( family_id, media_type, title, description, file_path, file_size, thumbnail_path, uploaded_by ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ''', (family_id, media_type, title, description, file_path, file_size, thumbnail_path, uploaded_by)) media_id = cursor.lastrowid # 创建默认触发策略 cursor.execute(''' INSERT INTO media_policies (media_id, time_windows, moods, occasions, cooldown, priority) VALUES (?, ?, ?, ?, ?, ?) ''', (media_id, '[]', '[]', '[]', 60, 5)) conn.commit() conn.close() return jsonify({ 'success': True, 'media_id': media_id, 'file_path': file_path, 'media_type': media_type }), 201 @app.route('/api/family/media', methods=['GET']) def get_family_media(): """获取家庭所有媒体列表""" family_id = request.args.get('family_id') if not family_id: return jsonify({'error': '缺少family_id参数'}), 400 conn = get_db() cursor = conn.cursor() # 获取媒体列表及其标签和策略 cursor.execute(''' SELECT m.*, p.time_windows, p.moods, p.occasions, p.cooldown, p.priority, p.play_count, p.last_played_at, GROUP_CONCAT(t.tag) as tags FROM media m LEFT JOIN media_policies p ON m.id = p.media_id LEFT JOIN media_tags t ON m.id = t.media_id WHERE m.family_id = ? AND m.is_active = 1 GROUP BY m.id ORDER BY m.created_at DESC ''', (family_id,)) media_list = [] for row in cursor.fetchall(): media_dict = dict(row) # 解析标签 if media_dict['tags']: media_dict['tags'] = media_dict['tags'].split(',') else: media_dict['tags'] = [] # 解析JSON字段 for field in ['time_windows', 'moods', 'occasions']: try: media_dict[field] = json.loads(media_dict[field]) if media_dict[field] else [] except: media_dict[field] = [] media_list.append(media_dict) conn.close() return jsonify({'media': media_list}) @app.route('/api/family/media/', methods=['GET']) def get_media_detail(media_id): """获取媒体详情""" conn = get_db() cursor = conn.cursor() cursor.execute(''' SELECT m.*, p.time_windows, p.moods, p.occasions, p.cooldown, p.priority, p.play_count, p.last_played_at FROM media m LEFT JOIN media_policies p ON m.id = p.media_id WHERE m.id = ? ''', (media_id,)) row = cursor.fetchone() if not row: conn.close() return jsonify({'error': '媒体不存在'}), 404 media_dict = dict(row) # 获取标签 cursor.execute('SELECT tag FROM media_tags WHERE media_id = ?', (media_id,)) media_dict['tags'] = [row['tag'] for row in cursor.fetchall()] # 解析JSON字段 for field in ['time_windows', 'moods', 'occasions']: try: media_dict[field] = json.loads(media_dict[field]) if media_dict[field] else [] except: media_dict[field] = [] # 获取播放统计 cursor.execute(''' SELECT COUNT(*) as total_plays, SUM(CASE WHEN feedback_type = 'like' THEN 1 ELSE 0 END) as likes, SUM(CASE WHEN feedback_type = 'dislike' THEN 1 ELSE 0 END) as dislikes FROM media_play_history mph LEFT JOIN media_feedback mf ON mph.media_id = mf.media_id AND mph.elderly_id = mf.elderly_id WHERE mph.media_id = ? ''', (media_id,)) stats = dict(cursor.fetchone()) media_dict['statistics'] = stats conn.close() return jsonify(media_dict) @app.route('/api/family/media/', methods=['PUT']) def update_media(media_id): """更新媒体信息和触发策略""" data = request.json conn = get_db() cursor = conn.cursor() # 更新媒体基本信息 if 'title' in data or 'description' in data: update_fields = [] params = [] if 'title' in data: update_fields.append('title = ?') params.append(data['title']) if 'description' in data: update_fields.append('description = ?') params.append(data['description']) update_fields.append('updated_at = CURRENT_TIMESTAMP') params.append(media_id) cursor.execute(f''' UPDATE media SET {', '.join(update_fields)} WHERE id = ? ''', params) # 更新标签 if 'tags' in data: # 删除旧标签 cursor.execute('DELETE FROM media_tags WHERE media_id = ?', (media_id,)) # 添加新标签 for tag in data['tags']: cursor.execute(''' INSERT INTO media_tags (media_id, tag) VALUES (?, ?) ''', (media_id, tag)) # 更新触发策略 policy_fields = ['time_windows', 'moods', 'occasions', 'cooldown', 'priority'] policy_updates = [] policy_params = [] for field in policy_fields: if field in data: policy_updates.append(f'{field} = ?') # JSON字段需要序列化 if field in ['time_windows', 'moods', 'occasions']: policy_params.append(json.dumps(data[field])) else: policy_params.append(data[field]) if policy_updates: policy_updates.append('updated_at = CURRENT_TIMESTAMP') policy_params.append(media_id) cursor.execute(f''' UPDATE media_policies SET {', '.join(policy_updates)} WHERE media_id = ? ''', policy_params) conn.commit() conn.close() return jsonify({'success': True}) @app.route('/api/family/media/', methods=['DELETE']) def delete_media(media_id): """删除媒体(软删除)""" conn = get_db() cursor = conn.cursor() cursor.execute(''' UPDATE media SET is_active = 0, updated_at = CURRENT_TIMESTAMP WHERE id = ? ''', (media_id,)) conn.commit() conn.close() return jsonify({'success': True}) # ==================== 老人端媒体 API ==================== @app.route('/api/elderly/media/recommended', methods=['GET']) def get_recommended_media(): """获取推荐媒体(基于时段、心境、场合等策略)""" family_id = request.args.get('family_id') elderly_id = request.args.get('elderly_id') current_mood = request.args.get('mood', '') # 当前心境 occasion = request.args.get('occasion', '') # 特殊场合 if not family_id: return jsonify({'error': '缺少family_id参数'}), 400 conn = get_db() cursor = conn.cursor() # 获取当前时间 now = datetime.now() current_time = now.strftime('%H:%M') # 查询符合条件的媒体 cursor.execute(''' SELECT m.*, p.time_windows, p.moods, p.occasions, p.cooldown, p.priority, p.play_count, p.last_played_at, GROUP_CONCAT(t.tag) as tags FROM media m INNER JOIN media_policies p ON m.id = p.media_id LEFT JOIN media_tags t ON m.id = t.media_id WHERE m.family_id = ? AND m.is_active = 1 GROUP BY m.id ORDER BY p.priority DESC, p.play_count ASC ''', (family_id,)) recommended = [] for row in cursor.fetchall(): media_dict = dict(row) # 解析JSON字段 time_windows = json.loads(media_dict['time_windows']) if media_dict['time_windows'] else [] moods = json.loads(media_dict['moods']) if media_dict['moods'] else [] occasions = json.loads(media_dict['occasions']) if media_dict['occasions'] else [] # 检查冷却时间 if media_dict['last_played_at']: last_played = datetime.fromisoformat(media_dict['last_played_at']) cooldown_minutes = media_dict['cooldown'] if now - last_played < timedelta(minutes=cooldown_minutes): continue # 还在冷却期,跳过 # 检查时段匹配 time_match = not time_windows # 如果没有设置时段,默认匹配 for window in time_windows: if '-' in window: start, end = window.split('-') if start <= current_time <= end: time_match = True break if not time_match: continue # 检查心境匹配 mood_match = not moods or not current_mood or current_mood in moods if not mood_match: continue # 检查场合匹配 occasion_match = not occasions or not occasion or occasion in occasions if not occasion_match: continue # 解析标签 if media_dict['tags']: media_dict['tags'] = media_dict['tags'].split(',') else: media_dict['tags'] = [] recommended.append(media_dict) conn.close() return jsonify({'media': recommended}) @app.route('/api/elderly/media//play', methods=['POST']) def record_media_play(media_id): """记录媒体播放""" data = request.json elderly_id = data.get('elderly_id') duration_watched = data.get('duration_watched', 0) completed = data.get('completed', 0) triggered_by = data.get('triggered_by', 'manual') mood_before = data.get('mood_before', '') mood_after = data.get('mood_after', '') if not elderly_id: return jsonify({'error': '缺少elderly_id'}), 400 conn = get_db() cursor = conn.cursor() # 记录播放历史 cursor.execute(''' INSERT INTO media_play_history ( media_id, elderly_id, duration_watched, completed, triggered_by, mood_before, mood_after ) VALUES (?, ?, ?, ?, ?, ?, ?) ''', (media_id, elderly_id, duration_watched, completed, triggered_by, mood_before, mood_after)) # 更新媒体策略的播放次数和最后播放时间 cursor.execute(''' UPDATE media_policies SET play_count = play_count + 1, last_played_at = CURRENT_TIMESTAMP WHERE media_id = ? ''', (media_id,)) conn.commit() conn.close() return jsonify({'success': True}) @app.route('/api/elderly/media//feedback', methods=['POST']) def submit_media_feedback(media_id): """提交媒体反馈(点赞/点踩)""" data = request.json elderly_id = data.get('elderly_id') feedback_type = data.get('feedback_type') # 'like' 或 'dislike' if not elderly_id or feedback_type not in ['like', 'dislike']: return jsonify({'error': '参数错误'}), 400 conn = get_db() cursor = conn.cursor() # 使用 INSERT OR REPLACE 来处理重复反馈 cursor.execute(''' INSERT OR REPLACE INTO media_feedback (media_id, elderly_id, feedback_type) VALUES (?, ?, ?) ''', (media_id, elderly_id, feedback_type)) conn.commit() conn.close() return jsonify({'success': True}) @app.route('/api/elderly/media/history', methods=['GET']) def get_media_history(): """获取媒体播放历史""" elderly_id = request.args.get('elderly_id') limit = request.args.get('limit', 50) if not elderly_id: return jsonify({'error': '缺少elderly_id参数'}), 400 conn = get_db() cursor = conn.cursor() cursor.execute(''' SELECT mph.*, m.title, m.media_type, m.file_path, mf.feedback_type FROM media_play_history mph INNER JOIN media m ON mph.media_id = m.id LEFT JOIN media_feedback mf ON mph.media_id = mf.media_id AND mph.elderly_id = mf.elderly_id WHERE mph.elderly_id = ? ORDER BY mph.played_at DESC LIMIT ? ''', (elderly_id, limit)) history = [dict(row) for row in cursor.fetchall()] conn.close() return jsonify({'history': history}) @app.route('/api/family/media/recent-plays', methods=['GET']) def get_recent_plays(): """获取最近播放的媒体(家属端查看)""" family_id = request.args.get('family_id') limit = request.args.get('limit', 10) if not family_id: return jsonify({'error': '缺少family_id参数'}), 400 conn = get_db() cursor = conn.cursor() cursor.execute(''' SELECT m.id, m.title, m.media_type, m.thumbnail_path, mph.played_at, COUNT(CASE WHEN mf.feedback_type = 'like' THEN 1 END) as likes, COUNT(CASE WHEN mf.feedback_type = 'dislike' THEN 1 END) as dislikes FROM media m INNER JOIN media_play_history mph ON m.id = mph.media_id LEFT JOIN media_feedback mf ON m.id = mf.media_id WHERE m.family_id = ? GROUP BY m.id, mph.played_at ORDER BY mph.played_at DESC LIMIT ? ''', (family_id, limit)) recent = [dict(row) for row in cursor.fetchall()] conn.close() return jsonify({'recent_plays': recent}) # ==================== 静态文件服务 ==================== @app.route('/uploads/') def serve_upload(filename): """提供上传文件的访问""" return send_from_directory(app.config['UPLOAD_FOLDER'], filename) # ==================== 数字人媒体展示 API ==================== @app.route('/api/elderly/show-media', methods=['POST']) def show_media_on_avatar(): """ 控制老人端在数字人主页中部弹出透明窗口展示媒体文件 参数: - media_title: 媒体标题(用于查找媒体文件) - avatar_text: 数字人播报内容 - duration: 展示时长(秒),默认30秒 """ data = request.json required_fields = ['media_title', 'avatar_text'] if not all(field in data for field in required_fields): return jsonify({'error': '缺少必需字段: media_title 和 avatar_text'}), 400 media_title = data['media_title'] avatar_text = data['avatar_text'] duration = data.get('duration', 30) # 默认30秒 # 从数据库查找媒体文件 conn = get_db() cursor = conn.cursor() cursor.execute(''' SELECT id, media_type, file_path, title FROM media WHERE title = ? AND is_active = 1 LIMIT 1 ''', (media_title,)) media_row = cursor.fetchone() if not media_row: conn.close() return jsonify({'error': f'未找到标题为 "{media_title}" 的媒体文件'}), 404 media_dict = dict(media_row) media_type = media_dict['media_type'] file_path = media_dict['file_path'] # 提取文件名(不含路径) media_filename = os.path.basename(file_path) try: # 1. 推送播报内容到数字人(5000端口) avatar_response = requests.post( 'http://127.0.0.1:5000/transparent-pass', json={ 'user': 'User', 'text': avatar_text }, timeout=5 ) if not avatar_response.ok: print(f'推送数字人播报失败: {avatar_response.status_code}') # 2. 通知老人端弹出媒体展示窗口 # 创建媒体展示事件(使用 family_alerts 表的特殊类型) cursor.execute(''' INSERT INTO family_alerts ( family_id, alert_type, level, title, message, metadata, source ) VALUES (?, ?, ?, ?, ?, ?, ?) ''', ( data.get('family_id', 'family_001'), 'media_display', # 特殊类型:媒体展示 'low', media_title, # 使用媒体标题作为标题 avatar_text, json.dumps({ 'media_filename': media_filename, 'media_type': media_type, 'media_title': media_title, 'avatar_text': avatar_text, 'duration': duration, 'event_type': 'show_media' }), 'system' )) event_id = cursor.lastrowid conn.commit() conn.close() return jsonify({ 'success': True, 'event_id': event_id, 'message': '媒体展示请求已发送' }), 201 except Exception as e: print(f'处理媒体展示请求失败: {e}') return jsonify({'error': str(e)}), 500 @app.route('/api/elderly/hide-media', methods=['POST']) def hide_media_on_avatar(): """ 控制老人端关闭当前显示的媒体窗口 参数: - family_id: 家庭ID(可选,默认family_001) """ data = request.json or {} family_id = data.get('family_id', 'family_001') conn = get_db() cursor = conn.cursor() try: # 创建隐藏媒体事件 cursor.execute(''' INSERT INTO family_alerts ( family_id, alert_type, level, title, message, metadata, source ) VALUES (?, ?, ?, ?, ?, ?, ?) ''', ( family_id, 'media_display', # 使用相同类型 'low', '关闭媒体显示', '关闭当前显示的媒体', json.dumps({ 'event_type': 'hide_media' }), 'system' )) event_id = cursor.lastrowid conn.commit() conn.close() return jsonify({ 'success': True, 'event_id': event_id, 'message': '关闭媒体请求已发送' }), 201 except Exception as e: print(f'处理关闭媒体请求失败: {e}') conn.close() return jsonify({'error': str(e)}), 500 @app.route('/api/elderly/poll-media-events', methods=['GET']) def poll_media_events(): """ 老人端轮询媒体展示事件 """ family_id = request.args.get('family_id', 'family_001') conn = get_db() cursor = conn.cursor() # 查询未读的媒体展示事件 cursor.execute(''' SELECT * FROM family_alerts WHERE family_id = ? AND alert_type = 'media_display' AND read = 0 AND is_active = 1 ORDER BY created_at DESC LIMIT 1 ''', (family_id,)) row = cursor.fetchone() if row: alert = dict(row) # 解析元数据 if alert['metadata']: try: alert['metadata'] = json.loads(alert['metadata']) except: alert['metadata'] = {} # 标记为已读 cursor.execute(''' UPDATE family_alerts SET read = 1, read_at = CURRENT_TIMESTAMP WHERE id = ? ''', (alert['id'],)) conn.commit() conn.close() return jsonify({'event': alert}) else: conn.close() return jsonify({'event': None}) # ==================== 老人端Toast通知 ==================== # 全局变量存储待显示的toast(内存中,重启会丢失) pending_toasts = {} # key: family_id, value: list of toast objects # SSE连接管理 sse_clients = {} # key: family_id, value: list of response queues @app.route('/api/elderly/toast', methods=['POST']) def create_toast(): """创建老人端Toast通知(供MCP工具调用)""" data = request.json family_id = data.get('family_id') toast_type = data.get('type', 'info') # success, info, calling message = data.get('message') duration = data.get('duration', 3000) # 默认3秒 if not family_id or not message: return jsonify({'error': '缺少必需参数'}), 400 # 创建toast对象 toast = { 'id': int(time.time() * 1000), # 使用时间戳作为ID 'type': toast_type, 'message': message, 'duration': duration, 'created_at': datetime.now().isoformat() } # 添加到待显示列表(备用轮询方式) if family_id not in pending_toasts: pending_toasts[family_id] = [] pending_toasts[family_id].append(toast) # 通过SSE推送给连接的客户端 if family_id in sse_clients: for client_queue in sse_clients[family_id]: try: client_queue.put(toast) except: pass # 客户端可能已断开 return jsonify({'success': True, 'toast_id': toast['id']}), 201 @app.route('/api/elderly/toast/poll', methods=['GET']) def poll_toast(): """老人端轮询获取待显示的Toast(备用方案)""" family_id = request.args.get('family_id') if not family_id: return jsonify({'error': '缺少family_id参数'}), 400 # 获取并清空该family的待显示toast toasts = pending_toasts.get(family_id, []) if toasts: # 返回最新的toast,并从列表中移除 toast = toasts.pop(0) return jsonify({'toast': toast}) return jsonify({'toast': None}) @app.route('/api/elderly/toast/stream', methods=['GET']) def toast_stream(): """SSE端点:实时推送Toast通知""" family_id = request.args.get('family_id') if not family_id: return jsonify({'error': '缺少family_id参数'}), 400 def generate(): import queue # 为此客户端创建队列 client_queue = queue.Queue() # 注册客户端 if family_id not in sse_clients: sse_clients[family_id] = [] sse_clients[family_id].append(client_queue) try: # 发送连接成功消息 yield f"data: {json.dumps({'type': 'connected'})}\n\n" # 持续监听队列 while True: try: # 等待新的toast(30秒超时,发送心跳) toast = client_queue.get(timeout=30) yield f"data: {json.dumps(toast)}\n\n" except queue.Empty: # 发送心跳保持连接 yield f": heartbeat\n\n" finally: # 客户端断开时清理 if family_id in sse_clients: sse_clients[family_id].remove(client_queue) if not sse_clients[family_id]: del sse_clients[family_id] return Response( generate(), mimetype='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no', 'Connection': 'keep-alive' } ) # ==================== 健康检查 ==================== @app.route('/api/health', methods=['GET']) def health_check(): """健康检查端点""" return jsonify({'status': 'ok', 'timestamp': datetime.now().isoformat()}) if __name__ == '__main__': # 初始化数据库 init_db() print("数据库初始化完成") # 启动应用 app.run(host='0.0.0.0', port=8000, debug=True)