马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
×
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- # @Time : 2025
- # @Script : 地震预警监控脚本(青龙面板版-无频率限制)
- # @Description: 自动获取最优IP接口,支持地区选择推送,仅做频率提醒
- import requests
- import json
- import time
- import os
- import sys
- import hashlib
- import urllib3
- from datetime import datetime
- from typing import Dict, Any, Optional, Union, List
- # 抑制SSL警告(IP接口需要关闭SSL验证)
- urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
- """
- 接口盒子:提供各种免费API接口,集群服务器保障服务稳定。
- 官网:https://www.apihz.cn/
- 【青龙面板环境变量配置】
- export QUAKE_APIHZ_ID="88888888" # 必填:接口盒子用户ID
- export QUAKE_APIHZ_KEY="88888888" # 必填:接口盒子通讯秘钥
- export QUAKE_WECOM_KEY="xxxxxxxx-xxxx-xxxx" # 必填:企业微信机器人KEY
- export QUAKE_MIN_MAG="3.0" # 可选:最小推送震级(默认3.0)
- export QUAKE_MENTIONED_MOBILE="" # 可选:需要@的手机号(多个用逗号分隔)
- export QUAKE_ENABLED="true" # 可选:是否启用推送(true/false,默认true)
- export QUAKE_REGIONS="" # 可选:推送地区(多个用逗号分隔,如"四川,云南",留空则全国)
- 【重要提醒】
- 接口盒子免费用户限制:每分钟调用不能超过10次!
- 建议青龙定时规则设置为:*/6 * * * * (每6分钟一次)
- 如果设置过于频繁(如每分钟1次),可能导致API被封禁!
- """
- # -----------------从环境变量读取配置-----------------
- def load_config_from_env():
- """从青龙环境变量加载配置"""
- config = {
- 'apihz': {
- 'id': os.getenv('QUAKE_APIHZ_ID', ''),
- 'key': os.getenv('QUAKE_APIHZ_KEY', ''),
- 'getapi_url': 'https://api.apihz.cn/getapi.php',
- 'fallback_url': 'https://cn.apihz.cn/api/tianqi/dizhen.php',
- 'api_path': '/api/tianqi/dizhen.php',
- },
- 'wecom': {
- 'enabled': os.getenv('QUAKE_ENABLED', 'true').lower() == 'true',
- 'webhook_url': '',
- 'mentioned_mobile': [],
- 'filter': {
- 'min_magnitude': 3.0,
- 'regions': [],
- }
- },
- 'storage': {
- 'record_file': 'quake_apihz_records.json',
- 'max_records': 200,
- }
- }
-
- # 处理企业微信KEY
- wecom_key = os.getenv('QUAKE_WECOM_KEY', '')
- if wecom_key:
- config['wecom']['webhook_url'] = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={wecom_key}"
-
- # 处理最小震级
- try:
- min_mag = float(os.getenv('QUAKE_MIN_MAG', '3.0'))
- config['wecom']['filter']['min_magnitude'] = min_mag
- except:
- config['wecom']['filter']['min_magnitude'] = 3.0
-
- # 处理地区过滤
- regions = os.getenv('QUAKE_REGIONS', '')
- if regions:
- import re
- region_list = [r.strip() for r in re.split('[,,、]', regions) if r.strip()]
- config['wecom']['filter']['regions'] = region_list
-
- # 处理@手机号
- mentioned = os.getenv('QUAKE_MENTIONED_MOBILE', '')
- if mentioned:
- config['wecom']['mentioned_mobile'] = [phone.strip() for phone in mentioned.split(',') if phone.strip()]
-
- return config
- # -----------------原封不动的getpost函数-----------------
- def getpost(config: Dict[str, Any]) -> Union[str, Dict[str, Any]]:
- """
- 通用GET/POST请求函数(保持原代码不变)
- """
- if not config.get('url'):
- return json.dumps({'code': 400, 'msg': '请求地址不能为空!'}, ensure_ascii=False)
-
- url = config['url']
- method = 'POST' if config.get('type', 0) == 1 else 'GET'
- params = {}
- data = {}
-
- request_data = config.get('data', {})
- if request_data and isinstance(request_data, dict):
- if method == 'GET':
- params = request_data
- else:
- data = request_data
-
- headers = config.get('headers', {}).copy()
- if config.get('user_agent'):
- headers['User-Agent'] = config['user_agent']
-
- if method == 'POST' and 'Content-Type' not in headers and data:
- headers['Content-Type'] = 'application/x-www-form-urlencoded'
-
- cookies = config.get('cookies')
- if isinstance(cookies, str):
- cookies_dict = {}
- for cookie in cookies.split(';'):
- if '=' in cookie:
- key, value = cookie.strip().split('=', 1)
- cookies_dict[key] = value
- cookies = cookies_dict
-
- request_options = config.get('request_options', {})
- proxies = request_options.get('proxies')
- proxy_auth = request_options.get('proxy_auth')
-
- if proxy_auth and proxies:
- for proxy_type in ['http', 'https']:
- if proxy_type in proxies and isinstance(proxies[proxy_type], str):
- proxy_url = proxies[proxy_type]
- if proxy_url.startswith('http://'):
- proxy_url = proxy_url.replace('http://', f'http://{proxy_auth[0]}:{proxy_auth[1]}@')
- proxies[proxy_type] = proxy_url
-
- request_kwargs = {
- 'url': url, 'method': method, 'headers': headers,
- 'params': params, 'data': data, 'cookies': cookies,
- 'timeout': (request_options.get('connect_timeout', 10), request_options.get('timeout', 30)),
- 'verify': request_options.get('verify', True),
- 'allow_redirects': request_options.get('allow_redirects', True),
- 'stream': request_options.get('stream', False),
- 'proxies': proxies,
- }
- request_kwargs = {k: v for k, v in request_kwargs.items() if v is not None}
-
- try:
- response = requests.request(**request_kwargs)
- if config.get('encoding'):
- response.encoding = config['encoding']
- return response.text
- except requests.exceptions.Timeout as e:
- return json.dumps({'code': 400, 'msg': f'请求超时: {str(e)}'}, ensure_ascii=False)
- except requests.exceptions.ConnectionError as e:
- return json.dumps({'code': 400, 'msg': f'连接错误: {str(e)}'}, ensure_ascii=False)
- except requests.exceptions.RequestException as e:
- return json.dumps({'code': 400, 'msg': f'请求失败: {str(e)}'}, ensure_ascii=False)
- except Exception as e:
- return json.dumps({'code': 400, 'msg': f'未知错误: {str(e)}'}, ensure_ascii=False)
- # -----------------辅助函数-----------------
- def get_ql_path(filename: str) -> str:
- """获取青龙面板脚本目录路径"""
- ql_data_path = '/ql/data/scripts/'
- ql_old_path = '/ql/scripts/'
- current_path = os.path.dirname(os.path.abspath(__file__))
-
- if '/ql/' in current_path:
- return os.path.join(current_path, filename)
- if os.path.exists(ql_data_path):
- return os.path.join(ql_data_path, filename)
- elif os.path.exists(ql_old_path):
- return os.path.join(ql_old_path, filename)
- else:
- return os.path.join(os.getcwd(), filename)
- def load_pushed_records(filename: str) -> List[str]:
- """加载已推送记录"""
- filepath = get_ql_path(filename)
- if not os.path.exists(filepath):
- return []
- try:
- with open(filepath, 'r', encoding='utf-8') as f:
- return json.load(f)
- except Exception as e:
- print(f"⚠️ 读取历史记录失败:{e}")
- return []
- def save_pushed_records(filename: str, records: List[str], max_records: int = 200):
- """保存推送记录"""
- filepath = get_ql_path(filename)
- if len(records) > max_records:
- records = records[-max_records:]
- try:
- with open(filepath, 'w', encoding='utf-8') as f:
- json.dump(records, f, ensure_ascii=False)
- except Exception as e:
- print(f"⚠️ 保存记录失败:{e}")
- def get_optimal_api_url(api_config: Dict) -> str:
- """获取当前最优IP接口地址(带容错),并拼接接口路径"""
- getapi_config = {
- 'url': api_config['getapi_url'],
- 'type': 0,
- 'data': {
- 'id': api_config['id'],
- 'key': api_config['key'],
- },
- 'request_options': {
- 'timeout': 10,
- 'connect_timeout': 5,
- 'verify': False,
- 'allow_redirects': True,
- },
- 'headers': {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
- 'Accept': 'application/json',
- },
- }
-
- print("🌐 正在获取最优IP接口地址...")
-
- try:
- response = getpost(getapi_config)
-
- try:
- result = json.loads(response) if isinstance(response, str) else response
-
- if isinstance(result, dict) and result.get('code') == 200:
- # 优先使用api字段
- if 'api' in result and result['api']:
- base_url = result['api'].strip()
- if base_url.endswith('/'):
- optimal_url = base_url + api_config['api_path'].lstrip('/')
- else:
- optimal_url = base_url + api_config['api_path']
- print(f"✅ 获取最优接口成功:{optimal_url}")
- return optimal_url
- elif 'url' in result and result['url']:
- optimal_url = result['url'].strip()
- print(f"✅ 获取最优接口成功(url字段):{optimal_url}")
- return optimal_url
- elif 'data' in result and isinstance(result['data'], dict):
- if 'url' in result['data']:
- optimal_url = result['data']['url'].strip()
- print(f"✅ 获取最优接口成功(data.url字段):{optimal_url}")
- return optimal_url
- elif 'api' in result['data']:
- optimal_url = result['data']['api'].strip()
- print(f"✅ 获取最优接口成功(data.api字段):{optimal_url}")
- return optimal_url
-
- print(f"⚠️ 返回格式异常,使用默认域名接口")
- return api_config['fallback_url']
- else:
- error_msg = result.get('msg', '未知错误') if isinstance(result, dict) else '解析失败'
- print(f"⚠️ 获取最优接口失败:{error_msg},将使用默认域名接口")
- return api_config['fallback_url']
-
- except json.JSONDecodeError:
- potential_url = response.strip() if isinstance(response, str) else ""
- if potential_url.startswith('http'):
- if potential_url.endswith('/'):
- optimal_url = potential_url + api_config['api_path'].lstrip('/')
- else:
- optimal_url = potential_url + api_config['api_path']
- print(f"✅ 获取最优接口成功(文本模式):{optimal_url}")
- return optimal_url
- else:
- print(f"⚠️ 返回非有效URL,使用默认域名接口")
- return api_config['fallback_url']
-
- except Exception as e:
- print(f"⚠️ 获取最优接口异常:{str(e)},将使用默认域名接口")
- return api_config['fallback_url']
- def generate_quake_id(quake: Dict) -> str:
- """生成唯一ID"""
- unique_str = f"{quake.get('weizhi', '')}_{quake.get('addtime', '')}_{quake.get('leve', '')}"
- return hashlib.md5(unique_str.encode('utf-8')).hexdigest()
- def send_wecom_notification(title: str, content: str, wecom_config: Dict) -> bool:
- """发送企业微信消息"""
- if not wecom_config['enabled']:
- print("ℹ️ 推送已禁用,跳过")
- return False
-
- webhook_url = wecom_config['webhook_url']
- if not webhook_url:
- print("❌ 错误:未配置企业微信机器人KEY")
- return False
-
- message = {
- "msgtype": "markdown",
- "markdown": {"content": f"**{title}**\n\n{content}"}
- }
- if wecom_config['mentioned_mobile']:
- message["mentioned_mobile_list"] = wecom_config['mentioned_mobile']
-
- try:
- resp = requests.post(
- webhook_url,
- headers={'Content-Type': 'application/json'},
- data=json.dumps(message, ensure_ascii=False).encode('utf-8'),
- timeout=10
- )
- result = resp.json()
- if result.get('errcode') == 0:
- print(f"✅ 推送成功:{title}")
- return True
- else:
- print(f"❌ 推送失败:{result.get('errmsg')}")
- return False
- except Exception as e:
- print(f"❌ 推送异常:{str(e)}")
- return False
- def format_quake_message(quake: Dict) -> str:
- """格式化地震信息"""
- location = quake.get('weizhi', '未知地点')
- magnitude = float(quake.get('leve', 0))
- depth = float(quake.get('shendu', 0))
- latitude = float(quake.get('weidu', 0))
- longitude = float(quake.get('jingdu', 0))
- occur_time = quake.get('addtime', '未知时间')
- cache_time = quake.get('hctime', '未知时间')
-
- lat_dir = "N" if latitude >= 0 else "S"
- lon_dir = "E" if longitude >= 0 else "W"
- coord_str = f"{abs(latitude):.2f}°{lat_dir} {abs(longitude):.2f}°{lon_dir}"
-
- if depth < 10:
- depth_desc = "浅源地震(破坏力较强)"
- elif depth < 30:
- depth_desc = "中源地震"
- else:
- depth_desc = "深源地震(破坏力较弱)"
-
- content = f"""**🎯 震中位置**:{location}
- **📈 地震等级**:{magnitude}级
- **🕳️ 震源深度**:{depth}km({depth_desc})
- **🌍 震中坐标**:{coord_str}
- **🕐 发生时间**:{occur_time}
- **📊 数据同步**:{cache_time}
- """
-
- if magnitude >= 6.0:
- content += "\n> 🔴 **严重警报**:强震级别,请注意安全!"
- elif magnitude >= 5.0:
- content += "\n> 🟠 **重要提醒**:显著地震,震感强烈!"
- elif magnitude >= 4.0:
- content += "\n> 🟡 **温馨提示**:有感地震,注意悬挂物晃动。"
-
- return content
- def should_push_quake(quake: Dict, filter_config: Dict, pushed_records: List[str]) -> tuple:
- """
- 判断是否推送
- 返回: (是否推送: bool, 原因: str)
- """
- unique_id = generate_quake_id(quake)
- location = quake.get('weizhi', '')
-
- # 1. 检查是否已推送
- if unique_id in pushed_records:
- return False, "已推送过"
-
- # 2. 震级检查
- try:
- magnitude = float(quake.get('leve', 0))
- if magnitude < filter_config['min_magnitude']:
- return False, f"震级{magnitude}低于阈值{filter_config['min_magnitude']}"
- except:
- return False, "震级数据异常"
-
- # 3. 地区检查
- regions = filter_config.get('regions', [])
- if regions:
- matched = False
- for region in regions:
- if region in location:
- matched = True
- break
-
- if not matched:
- return False, f"地区[{location}]不在监控列表{regions}中"
-
- return True, "符合推送条件"
- # -----------------主程序-----------------
- def main():
- print(f"🌍 地震预警监控启动(青龙面板版)")
- print(f"⏰ 当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
- print("-" * 50)
-
- # ⚠️ 重要频率提醒
- print("⚠️ 【重要提醒】接口盒子免费用户限制:每分钟调用不能超过10次!")
- print("⚠️ 建议青龙定时规则设置为:*/6 * * * * (每6分钟一次)")
- print("⚠️ 过于频繁(如每分钟1次)可能导致API被封禁!")
- print("-" * 50)
-
- # 加载配置
- config = load_config_from_env()
-
- # 验证必要配置
- if not config['apihz']['id'] or not config['apihz']['key']:
- print("❌ 错误:缺少必要环境变量 QUAKE_APIHZ_ID 或 QUAKE_APIHZ_KEY")
- sys.exit(1)
-
- if not config['wecom']['webhook_url']:
- print("❌ 错误:缺少必要环境变量 QUAKE_WECOM_KEY")
- sys.exit(1)
-
- # 显示配置信息
- print(f"📊 配置信息:")
- print(f" 用户ID:{config['apihz']['id']}")
- print(f" 最小震级:{config['wecom']['filter']['min_magnitude']}级")
- if config['wecom']['filter']['regions']:
- print(f" 监控地区:{', '.join(config['wecom']['filter']['regions'])}(仅推送这些地区)")
- else:
- print(f" 监控地区:全国(未配置地区限制)")
- print(f" 推送状态:{'启用' if config['wecom']['enabled'] else '禁用'}")
- if config['wecom']['mentioned_mobile']:
- print(f" @手机号:{', '.join(config['wecom']['mentioned_mobile'])}")
- print("-" * 50)
-
- # 1. 动态获取最优IP接口
- optimal_url = get_optimal_api_url(config['apihz'])
- print(f"🎯 实际请求地址:{optimal_url}")
- print("-" * 50)
-
- # 2. 构建地震API配置
- quake_config = {
- 'url': optimal_url,
- 'type': 0,
- 'data': {
- 'id': config['apihz']['id'],
- 'key': config['apihz']['key'],
- },
- 'request_options': {
- 'timeout': 30,
- 'connect_timeout': 10,
- 'verify': False if 'http://' in optimal_url else True,
- 'allow_redirects': True,
- },
- 'headers': {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
- 'Accept': 'application/json',
- },
- }
-
- # 3. 加载历史记录
- record_file = config['storage']['record_file']
- pushed_records = load_pushed_records(record_file)
- print(f"📋 已加载 {len(pushed_records)} 条历史推送记录")
-
- # 4. 请求地震数据(⚠️ 无频率限制,用户自行控制)
- print("🔍 正在请求地震数据...")
- response = getpost(quake_config)
-
- # 5. 解析响应
- try:
- result = json.loads(response) if isinstance(response, str) else response
-
- if isinstance(result, dict) and result.get('code') == 200:
- quake_list = result.get('data', [])
- print(f"✅ 成功获取 {len(quake_list)} 条地震记录")
- else:
- error_msg = result.get('msg', '未知错误') if isinstance(result, dict) else str(response)[:200]
- print(f"❌ API错误:{error_msg}")
- sys.exit(1)
- except Exception as e:
- print(f"❌ 解析失败:{str(e)}")
- sys.exit(1)
-
- # 6. 处理数据
- new_records = []
- push_count = 0
- skip_count = 0
- skip_reasons = {}
-
- # 去重
- unique_quakes = {}
- for quake in quake_list:
- key = f"{quake.get('weizhi')}_{quake.get('addtime')}_{quake.get('leve')}"
- if key not in unique_quakes:
- unique_quakes[key] = quake
-
- filtered_list = list(unique_quakes.values())
-
- for quake in filtered_list:
- unique_id = generate_quake_id(quake)
- location = quake.get('weizhi', '未知地点')
- magnitude = float(quake.get('leve', 0))
-
- should_push, reason = should_push_quake(quake, config['wecom']['filter'], pushed_records)
-
- if should_push:
- if magnitude >= 6.0:
- title = f"🔴🔴🔴 强震警报:{location}发生{magnitude}级地震"
- elif magnitude >= 5.0:
- title = f"🔴 强震预警:{location}发生{magnitude}级地震"
- elif magnitude >= 4.0:
- title = f"🟠 显著地震:{location}发生{magnitude}级地震"
- else:
- title = f"🟡 地震信息:{location}发生{magnitude}级地震"
-
- content = format_quake_message(quake)
-
- if send_wecom_notification(title, content, config['wecom']):
- push_count += 1
-
- new_records.append(unique_id)
- else:
- skip_count += 1
- skip_reasons[reason] = skip_reasons.get(reason, 0) + 1
-
- if unique_id not in pushed_records:
- new_records.append(unique_id)
-
- # 7. 保存记录
- if new_records:
- all_records = pushed_records + new_records
- save_pushed_records(record_file, all_records, config['storage']['max_records'])
-
- print("-" * 50)
- print(f"✅ 执行完成:推送 {push_count} 条,跳过 {skip_count} 条")
-
- if skip_reasons:
- print("📊 跳过原因统计:")
- for reason, count in skip_reasons.items():
- print(f" - {reason}: {count}条")
-
- if push_count == 0 and len(filtered_list) > 0:
- print("ℹ️ 本次运行无符合条件的地震推送")
-
- print(f"🏠 记录文件:{get_ql_path(record_file)}")
-
- # 再次提醒频率限制
- print("-" * 50)
- print("⚠️ 提醒:本次请求已计入频次,免费用户每分钟不能超过10次!")
- print("⚠️ 请确保青龙定时设置合理,避免API被封禁!")
- if __name__ == '__main__':
- main()
复制代码
|