具体脚本如下:
import pyodbc import requests import json import mysql.connector from abc import ABC, abstractmethod from datetime import datetime, timedelta from typing import Optional, Union, Dict, Tuple, Any class DatabaseConnection(ABC): """数据库连接的抽象基类""" @abstractmethod def connect(self): pass @abstractmethod def disconnect(self): pass @abstractmethod def execute_query(self, query: str, params: Optional[Tuple[Any, ...]] = None): pass class MySQLConnection(DatabaseConnection): """MySQL连接类""" def __init__(self, host: str, database: str, username: str, password: str, port: int = 3306): self.host = host self.database = database self.username = username self.password = password self.port = port self.connection = None self.cursor = None def connect(self): self.connection = mysql.connector.connect( host=self.host, database=self.database, user=self.username, password=self.password, port=self.port ) self.cursor = self.connection.cursor() def disconnect(self): if self.cursor: self.cursor.close() if self.connection: self.connection.close() def execute_query(self, query: str, params: Optional[Tuple[Any, ...]] = None): try: if params: self.cursor.execute(query, params) else: self.cursor.execute(query) return self.cursor.fetchall() except mysql.connector.Error as e: raise Exception(f"MySQL查询错误: {str(e)}") class VehicleDataService: """车辆数据服务类""" def __init__(self, db_connection: DatabaseConnection): self.db_connection = db_connection self.db_connection.connect() def __del__(self): try: if hasattr(self, 'db_connection'): self.db_connection.disconnect() except: pass class TrafficDataPrint: """车流数据打印和推送服务""" def __init__(self, traffic_service: VehicleDataService, webhook_url: str): self.traffic_service = traffic_service self.webhook_url = webhook_url def format_traffic_message(self, date: str, count: int) -> Dict: """格式化车流数据消息""" return { "msg_type": "post", "content": { "post": { "zh_cn": { "title": "车流数据统计", "content": [ [ { "tag": "text", "text": f"📊 车流统计报告\n\n" }, { "tag": "text", "text": f"📅 统计日期:{date}\n" }, { "tag": "text", "text": f"🚗 车流总量:{count:,} 辆次\n" } ] ] } } } } def send_to_feishu(self, message: Dict) -> bool: """发送消息到飞书""" try: headers = { 'Content-Type': 'application/json' } response = requests.post( self.webhook_url, headers=headers, data=json.dumps(message) ) if response.status_code == 200: result = response.json() if result.get('code') == 0: return True else: print(f"发送失败: {result.get('msg')}") return False else: print(f"HTTP错误: {response.status_code}") return False except Exception as e: print(f"发送消息时发生错误: {str(e)}") return False def get_and_send_traffic_data(self, date: Optional[str] = None) -> bool: """获取并发送车流数据""" try: if date is None: yesterday = datetime.now() - timedelta(days=1) date = yesterday.strftime('%Y-%m-%d') # 修改为使用车流查询方法 query = """ SELECT COUNT(CarCode) FROM tc_usercrdtm WHERE DATE(Crdtm) = DATE(%s) AND ChannelID IN ('27','28','37') """ results = self.traffic_service.db_connection.execute_query(query, (date,)) if not results or results[0][0] is None: print(f"未找到 {date} 的车流数据") return False traffic_count = results[0][0] message = self.format_traffic_message(date, traffic_count) return self.send_to_feishu(message) except Exception as e: print(f"处理车流数据时发生错误: {str(e)}") return False def main(): mysql_config = { 'host': '127.0.0.1', 'database': 'acs_parking_video', 'username': '****', #立方数据用户名 'password': '****', #立方数据库密码 'port': 3306 } # 飞书webhook配置 webhook_url = "******" try: # 创建MySQL连接 db_connection = MySQLConnection(**mysql_config) # 创建车流数据服务 traffic_service = VehicleDataService(db_connection) # 创建消息推送服务 traffic_printer = TrafficDataPrint(traffic_service, webhook_url) # 获取并发送昨天的车流数据 success = traffic_printer.get_and_send_traffic_data() if success: print("车流数据已成功发送到飞书") else: print("车流数据发送失败") except Exception as e: print(f"错误: {str(e)}") if __name__ == "__main__": main()