做视频网站侵权吗获客引流100种方法
2026/1/9 8:44:00 网站建设 项目流程
做视频网站侵权吗,获客引流100种方法,it培训网站,提供设计的的网站目录后台任务与WebSocket实时应用开发指南1. 引言#xff1a;现代Web应用的实时需求 {#引言}1.1 实时应用的关键指标1.2 技术选型对比2. 技术架构概览 {#技术架构}2.1 系统架构图2.2 核心组件交互流程3. 后台任务系统设计与实现 {#后台任务系统}3.1 任务调度器设计4. WebSocke…目录后台任务与WebSocket实时应用开发指南1. 引言现代Web应用的实时需求 {#引言}1.1 实时应用的关键指标1.2 技术选型对比2. 技术架构概览 {#技术架构}2.1 系统架构图2.2 核心组件交互流程3. 后台任务系统设计与实现 {#后台任务系统}3.1 任务调度器设计4. WebSocket服务器深度实现 {#websocket服务器}4.1 WebSocket服务器核心5. 前后端集成与通信协议 {#集成通信}5.1 通信协议设计6. 完整实战案例实时股票监控系统 {#实战案例}7. 性能优化与最佳实践 {#性能优化}7.1 WebSocket性能优化7.2 数据库优化8. 监控与调试 {#监控调试}8.1 全面的监控系统8.2 调试工具9. 部署与扩展 {#部署扩展}9.1 容器化部署9.2 水平扩展策略10. 总结与展望 {#总结}10.1 关键成果10.2 性能基准10.3 未来发展方向10.4 数学优化展望10.5 结束语『宝藏代码胶囊开张啦』—— 我的 CodeCapsule 来咯✨写代码不再头疼我的新站点 CodeCapsule 主打一个 “白菜价”“量身定制”无论是卡脖子的毕设/课设/文献复现需要灵光一现的算法改进还是想给项目加个“外挂”这里都有便宜又好用的代码方案等你发现低成本高适配助你轻松通关速来围观 CodeCapsule官网后台任务与WebSocket实时应用开发指南1. 引言现代Web应用的实时需求 {#引言}随着Web应用复杂度的增加用户对实时性的需求日益增长。根据2024年Stack Overflow开发者调查实时Web技术已成为增长最快的技能之一需求同比增长85%。从金融交易到协作工具从游戏到物联网监控实时数据传输已成为现代应用的标配。1.1 实时应用的关键指标实时应用需要满足以下关键性能指标延迟100ms为优秀100-300ms为可接受吞吐量单节点支持10K并发连接可用性99.99%以上消息丢失率0.01%1.2 技术选型对比实时通信技术WebSocketServer-Sent Events长轮询WebRTC全双工低延迟适合实时游戏/聊天单向服务器推送自动重连适合股票行情/通知兼容性好高延迟资源消耗大P2P通信音视频传输复杂但强大2. 技术架构概览 {#技术架构}2.1 系统架构图数据层后台任务层消息中间件层应用服务器层负载均衡层客户端层数据库缓存文件存储任务调度器任务执行器结果处理器消息队列发布订阅系统WebSocket服务器1WebSocket服务器2WebSocket服务器3负载均衡器Web浏览器移动应用桌面应用2.2 核心组件交互流程1. 客户端连接 → WebSocket握手 → 连接建立 2. 后台任务触发 → 消息队列 → 任务执行 3. 任务完成 → 发布订阅 → WebSocket推送 4. 客户端接收 → UI更新 → 用户交互 \begin{aligned} 1.\ \text{客户端连接} \rightarrow \text{WebSocket握手} \rightarrow \text{连接建立}\\ 2.\ \text{后台任务触发} \rightarrow \text{消息队列} \rightarrow \text{任务执行}\\ 3.\ \text{任务完成} \rightarrow \text{发布订阅} \rightarrow \text{WebSocket推送}\\ 4.\ \text{客户端接收} \rightarrow \text{UI更新} \rightarrow \text{用户交互} \end{aligned}​1.客户端连接→WebSocket握手→连接建立2.后台任务触发→消息队列→任务执行3.任务完成→发布订阅→WebSocket推送4.客户端接收→UI更新→用户交互​3. 后台任务系统设计与实现 {#后台任务系统}3.1 任务调度器设计 后台任务调度系统 支持定时任务、延迟任务、周期任务、任务依赖、任务重试 importasyncioimporttimeimportuuidimportpicklefromabcimportABC,abstractmethodfromtypingimportAny,Dict,List,Optional,Callable,UnionfromenumimportEnumfromdatetimeimportdatetime,timedeltafromdataclassesimportdataclass,fieldimportheapqimportloggingfromconcurrent.futuresimportThreadPoolExecutor,ProcessPoolExecutorimportredis.asyncioasaioredisfrompydanticimportBaseModel,Field,validatorclassTaskStatus(Enum):任务状态枚举PENDINGpendingSCHEDULEDscheduledRUNNINGrunningSUCCESSsuccessFAILEDfailedCANCELLEDcancelledRETRYINGretryingclassTaskPriority(Enum):任务优先级LOW0NORMAL1HIGH2CRITICAL3dataclass(orderTrue)classScheduledTask:调度任务数据类execute_at:float# 执行时间戳priority:intfield(compareFalse)task_id:strfield(compareFalse)data:Anyfield(compareFalse)def__post_init__(self):# 确保优先级转换为数值ifisinstance(self.priority,TaskPriority):self.priorityself.priority.valueclassTask(BaseModel):任务模型id:strField(default_factorylambda:str(uuid.uuid4()))name:strfunc_name:strargs:List[Any]Field(default_factorylist)kwargs:Dict[str,Any]Field(default_factorydict)# 调度相关schedule_type:strimmediate# immediate, delayed, cronexecute_at:Optional[datetime]Nonecron_expression:Optional[str]Noneinterval_seconds:Optional[int]None# 执行控制max_retries:int3retry_delay:int60# 秒timeout:Optional[int]Nonepriority:TaskPriorityTaskPriority.NORMAL# 依赖关系depends_on:List[str]Field(default_factorylist)# 状态追踪status:TaskStatusTaskStatus.PENDING created_at:datetimeField(default_factorydatetime.now)started_at:Optional[datetime]Nonecompleted_at:Optional[datetime]Noneattempts:int0result:Optional[Any]Noneerror:Optional[str]Nonetraceback:Optional[str]None# 元数据metadata:Dict[str,Any]Field(default_factorydict)classConfig:arbitrary_types_allowedTruejson_encoders{datetime:lambdadt:dt.isoformat(),TaskPriority:lambdap:p.value,TaskStatus:lambdas:s.value,}validator(schedule_type)defvalidate_schedule_type(cls,v):allowed[immediate,delayed,cron,interval]ifvnotinallowed:raiseValueError(fschedule_type must be one of{allowed})returnvdefget_execution_time(self)-Optional[float]:获取执行时间戳ifself.execute_at:returnself.execute_at.timestamp()returnNonedefshould_retry(self)-bool:判断是否应该重试return(self.statusTaskStatus.FAILEDandself.attemptsself.max_retries)defcalculate_next_retry(self)-datetime:计算下次重试时间delayself.retry_delay*(2**(self.attempts-1))# 指数退避returndatetime.now()timedelta(secondsdelay)classTaskResult(BaseModel):任务结果模型task_id:strstatus:TaskStatus result:Optional[Any]Noneerror:Optional[str]Noneexecution_time:Optional[float]None# 执行耗时秒completed_at:datetimeField(default_factorydatetime.now)classConfig:json_encoders{datetime:lambdadt:dt.isoformat(),TaskStatus:lambdas:s.value,}classTaskRegistry:任务注册表def__init__(self):self.tasks:Dict[str,Callable]{}defregister(self,name:strNone):注册任务装饰器defdecorator(func):task_namenameorfunc.__name__ self.tasks[task_name]funcreturnfuncreturndecoratordefget_task(self,name:str)-Optional[Callable]:获取任务函数returnself.tasks.get(name)deflist_tasks(self)-List[str]:列出所有注册的任务returnlist(self.tasks.keys())classBaseTaskBackend(ABC):任务后端基类abstractmethodasyncdefenqueue(self,task:Task)-str:入队任务passabstractmethodasyncdefdequeue(self)-Optional[Task]:出队任务passabstractmethodasyncdefget_task(self,task_id:str)-Optional[Task]:获取任务passabstractmethodasyncdefupdate_task(self,task:Task)-bool:更新任务passabstractmethodasyncdefdelete_task(self,task_id:str)-bool:删除任务passclassMemoryTaskBackend(BaseTaskBackend):内存任务后端用于开发和测试def__init__(self):self.tasks:Dict[str,Task]{}self.scheduled_queue:List[ScheduledTask][]self.pending_queue:List[Task][]self.lockasyncio.Lock()asyncdefenqueue(self,task:Task)-str:asyncwithself.lock:self.tasks[task.id]taskiftask.schedule_typeimmediate:heapq.heappush(self.pending_queue,task)eliftask.schedule_typedelayedandtask.execute_at:scheduledScheduledTask(execute_attask.execute_at.timestamp(),prioritytask.priority.value,task_idtask.id,datatask)heapq.heappush(self.scheduled_queue,scheduled)returntask.idasyncdefdequeue(self)-Optional[Task]:asyncwithself.lock:# 检查是否有到期的定时任务nowtime.time()whileself.scheduled_queueandself.scheduled_queue[0].execute_atnow:scheduledheapq.heappop(self.scheduled_queue)taskscheduled.data task.statusTaskStatus.PENDING heapq.heappush(self.pending_queue,task)ifself.pending_queue:returnheapq.heappop(self.pending_queue)returnNoneasyncdefget_task(self,task_id:str)-Optional[Task]:returnself.tasks.get(task_id)asyncdefupdate_task(self,task:Task)-bool:asyncwithself.lock:iftask.idinself.tasks:self.tasks[task.id]taskreturnTruereturnFalseasyncdefdelete_task(self,task_id:str)-bool:asyncwithself.lock:iftask_idinself.tasks:delself.tasks[task_id]returnTruereturnFalseclassRedisTaskBackend(BaseTaskBackend):Redis任务后端生产环境def__init__(self,redis_url:strredis://localhost:6379):self.redis_urlredis_url self.redis:Optional[aioredis.Redis]Noneself.pubsub:Optional[aioredis.PubSub]None# Redis键名self.TASK_PREFIXtask:self.PENDING_QUEUEtasks:pendingself.SCHEDULED_SETtasks:scheduledself.RESULTS_PREFIXtask:result:asyncdefconnect(self):连接Redisself.redisawaitaioredis.from_url(self.redis_url)self.pubsubself.redis.pubsub()asyncdefdisconnect(self):断开Redis连接ifself.redis:awaitself.redis.close()def_task_key(self,task_id:str)-str:生成任务键名returnf{self.TASK_PREFIX}{task_id}def_result_key(self,task_id:str)-str:生成结果键名returnf{self.RESULTS_PREFIX}{task_id}asyncdefenqueue(self,task:Task)-str:入队任务到Redisifnotself.redis:awaitself.connect()# 序列化任务task_datatask.json()# 存储任务task_keyself._task_key(task.id)awaitself.redis.setex(task_key,86400,# 24小时TTLtask_data)# 根据调度类型放入不同队列iftask.schedule_typeimmediate:# 使用有序集合按优先级排序scoretask.priority.value*1000000time.time()awaitself.redis.zadd(self.PENDING_QUEUE,{task.id:score})eliftask.schedule_typedelayedandtask.execute_at:# 放入延迟集合execute_attask.execute_at.timestamp()awaitself.redis.zadd(self.SCHEDULED_SET,{task.id:execute_at})# 发布新任务事件awaitself.redis.publish(tasks:new,task.id)returntask.idasyncdefdequeue(self)-Optional[Task]:从Redis出队任务ifnotself.redis:awaitself.connect()# 检查延迟任务nowtime.time()delayed_tasksawaitself.redis.zrangebyscore(self.SCHEDULED_SET,-inf,now,start0,num1,withscoresTrue)ifdelayed_tasks:task_id,_delayed_tasks[0]# 移动到待处理队列awaitself.redis.zrem(self.SCHEDULED_SET,task_id)awaitself.redis.zadd(self.PENDING_QUEUE,{task_id:now})# 获取最高优先级的任务resultawaitself.redis.zpopmin(self.PENDING_QUEUE,count1)ifnotresult:returnNonetask_id,_result[0]task_dataawaitself.redis.get(self._task_key(task_id))ifnottask_data:returnNone# 反序列化任务task_dictjson.loads(task_data)returnTask(**task_dict)asyncdefget_task(self,task_id:str)-Optional[Task]:从Redis获取任务ifnotself.redis:awaitself.connect()task_dataawaitself.redis.get(self._task_key(task_id))ifnottask_data:returnNonetask_dictjson.loads(task_data)returnTask(**task_dict)asyncdefupdate_task(self,task:Task)-bool:更新Redis中的任务ifnotself.redis:awaitself.connect()task_datatask.json()task_keyself._task_key(task.id)awaitself.redis.setex(task_key,86400,task_data)returnTrueasyncdefdelete_task(self,task_id:str)-bool:从Redis删除任务ifnotself.redis:awaitself.connect()# 删除任务数据task_keyself._task_key(task_id)result_keyself._result_key(task_id)awaitself.redis.delete(task_key,result_key)# 从所有队列中移除awaitself.redis.zrem(self.PENDING_QUEUE,task_id)awaitself.redis.zrem(self.SCHEDULED_SET,task_id)returnTrueasyncdefstore_result(self,result:TaskResult):存储任务结果ifnotself.redis:awaitself.connect()result_dataresult.json()result_keyself._result_key(result.task_id)# 存储结果设置TTLawaitself.redis.setex(result_key,604800,# 7天TTLresult_data)# 发布任务完成事件awaitself.redis.publish(tasks:completed,result.task_id)classTaskScheduler:任务调度器def__init__(self,backend:BaseTaskBackendNone,max_workers:int10,enable_metrics:boolTrue):self.backendbackendorMemoryTaskBackend()self.registryTaskRegistry()self.executorThreadPoolExecutor(max_workersmax_workers)self.process_executorProcessPoolExecutor(max_workersmax_workers//2)# 任务状态追踪self.running_tasks:Dict[str,asyncio.Task]{}self.metrics{tasks_executed:0,tasks_failed:0,tasks_succeeded:0,total_execution_time:0.0,average_execution_time:0.0,}ifenable_metricselseNone# 调度器状态self.is_runningFalseself.scheduler_task:Optional[asyncio.Task]None# 事件回调self.on_task_started:List[Callable][]self.on_task_completed:List[Callable][]self.on_task_failed:List[Callable][]self.loggerlogging.getLogger(task_scheduler)deftask(self,name:strNone,max_retries:int3):任务装饰器defdecorator(func):task_namenameorfunc.__name__ self.registry.tasks[task_name]func# 创建包装函数asyncdefwrapper(*args,**kwargs):returnawaitfunc(*args,**kwargs)wrapper.task_nametask_name wrapper.max_retriesmax_retriesreturnwrapperreturndecoratorasyncdefstart(self):启动调度器ifself.is_running:returnself.is_runningTrueself.scheduler_taskasyncio.create_task(self._scheduler_loop())self.logger.info(Task scheduler started)asyncdefstop(self):停止调度器ifnotself.is_running:returnself.is_runningFalseifself.scheduler_task:self.scheduler_task.cancel()try:awaitself.scheduler_taskexceptasyncio.CancelledError:pass# 等待所有任务完成awaitasyncio.gather(*self.running_tasks.values(),return_exceptionsTrue)# 关闭执行器self.executor.shutdown(waitTrue)self.process_executor.shutdown(waitTrue)self.logger.info(Task scheduler stopped)asyncdef_scheduler_loop(self):调度器主循环whileself.is_running:try:# 获取下一个任务taskawaitself.backend.dequeue()iftask:# 检查任务依赖iftask.depends_on:deps_metawaitself._check_dependencies(task)ifnotdeps_met:# 重新入队稍后重试awaitasyncio.sleep(1)awaitself.backend.enqueue(task)continue# 执行任务task.statusTaskStatus.RUNNING task.started_atdatetime.now()task.attempts1awaitself.backend.update_task(task)# 触发任务开始事件awaitself._trigger_event(self.on_task_started,task)# 创建异步任务执行task_coroself._execute_task(task)running_taskasyncio.create_task(task_coro)self.running_tasks[task.id]running_task# 设置回调清理running_task.add_done_callback(lambdat,tidtask.id:self.running_tasks.pop(tid,None))else:# 没有任务等待一段时间awaitasyncio.sleep(0.1)exceptasyncio.CancelledError:breakexceptExceptionase:self.logger.error(fScheduler loop error:{e})awaitasyncio.sleep(1)asyncdef_execute_task(self,task:Task):执行单个任务start_timetime.time()try:# 获取任务函数task_funcself.registry.get_task(task.func_name)ifnottask_func:raiseValueError(fTask function {task.func_name} not found)# 执行任务ifasyncio.iscoroutinefunction(task_func):# 异步函数resultawaittask_func(*task.args,**task.kwargs)else:# 同步函数在线程池中执行loopasyncio.get_event_loop()resultawaitloop.run_in_executor(self.executor,task_func,*task.args,**task.kwargs)# 更新任务状态task.statusTaskStatus.SUCCESS task.resultresult task.completed_atdatetime.now()# 存储结果task_resultTaskResult(task_idtask.id,statusTaskStatus.SUCCESS,resultresult,execution_timetime.time()-start_time)ifisinstance(self.backend,RedisTaskBackend):awaitself.backend.store_result(task_result)# 触发任务完成事件awaitself._trigger_event(self.on_task_completed,task,result)# 更新指标ifself.metrics:self._update_metrics(task_result)self.logger.info(fTask{task.id}completed successfully)exceptExceptionase:# 任务执行失败task.statusTaskStatus.FAILED task.errorstr(e)task.completed_atdatetime.now()# 检查是否需要重试iftask.should_retry():task.statusTaskStatus.RETRYING task.execute_attask.calculate_next_retry()task.schedule_typedelayed# 重新入队awaitself.backend.enqueue(task)self.logger.info(fTask{task.id}scheduled for retry)else:# 最终失败task_resultTaskResult(task_idtask.id,statusTaskStatus.FAILED,errorstr(e),execution_timetime.time()-start_time)ifisinstance(self.backend,RedisTaskBackend):awaitself.backend.store_result(task_result)# 触发任务失败事件awaitself._trigger_event(self.on_task_failed,task,e)# 更新指标ifself.metrics:self.metrics[tasks_failed]1self.logger.error(fTask{task.id}failed:{e})finally:# 更新任务状态awaitself.backend.update_task(task)asyncdef_check_dependencies(self,task:Task)-bool:检查任务依赖是否满足fordep_idintask.depends_on:dep_taskawaitself.backend.get_task(dep_id)ifnotdep_taskordep_task.status!TaskStatus.SUCCESS:returnFalsereturnTrueasyncdef_trigger_event(self,callbacks:List[Callable],*args,**kwargs):触发事件回调forcallbackincallbacks:try:ifasyncio.iscoroutinefunction(callback):awaitcallback(*args,**kwargs)else:callback(*args,**kwargs)exceptExceptionase:self.logger.error(fEvent callback error:{e})def_update_metrics(self,result:TaskResult):更新性能指标self.metrics[tasks_executed]1ifresult.statusTaskStatus.SUCCESS:self.metrics[tasks_succeeded]1ifresult.execution_time:self.metrics[total_execution_time]result.execution_time self.metrics[average_execution_time](self.metrics[total_execution_time]/self.metrics[tasks_executed])asyncdefsubmit_task(self,func_name:str,*args,name:strNone,schedule_type:strimmediate,execute_at:Optional[datetime]None,max_retries:int3,priority:TaskPriorityTaskPriority.NORMAL,**kwargs)-str:提交新任务taskTask(namenameorfunc_name,func_namefunc_name,argslist(args),kwargskwargs,schedule_typeschedule_type,execute_atexecute_at,max_retriesmax_retries,prioritypriority,)task_idawaitself.backend.enqueue(task)returntask_idasyncdefget_task_status(self,task_id:str)-Optional[Dict[str,Any]]:获取任务状态taskawaitself.backend.get_task(task_id)ifnottask:returnNonereturn{id:task.id,name:task.name,status:task.status.value,created_at:task.created_at,started_at:task.started_at,completed_at:task.completed_at,attempts:task.attempts,error:task.error,}asyncdefcancel_task(self,task_id:str)-bool:取消任务taskawaitself.backend.get_task(task_id)ifnottask:returnFalse# 如果任务正在运行尝试取消iftask_idinself.running_tasks:self.running_tasks[task_id].cancel()# 更新任务状态task.statusTaskStatus.CANCELLED task.completed_atdatetime.now()awaitself.backend.update_task(task)returnTruedefget_metrics(self)-Dict[str,Any]:获取调度器指标ifnotself.metrics:return{}return{**self.metrics,running_tasks:len(self.running_tasks),registered_tasks:len(self.registry.tasks),is_running:self.is_running,}# 示例任务函数if__name____main__:# 配置日志logging.basicConfig(levellogging.INFO)# 创建调度器schedulerTaskScheduler(max_workers5)# 注册任务scheduler.task(nameprocess_data,max_retries3)asyncdefprocess_data(data:str,multiplier:int1):示例异步任务awaitasyncio.sleep(1)# 模拟耗时操作processeddata.upper()*multiplierreturn{processed:processed,length:len(processed)}scheduler.task(namecalculate_sum)defcalculate_sum(numbers:List[int]):示例同步任务time.sleep(0.5)# 模拟耗时操作returnsum(numbers)# 添加事件监听器asyncdefon_task_started(task:Task):print(fTask started:{task.name}({task.id}))asyncdefon_task_completed(task:Task,result:Any):print(fTask completed:{task.name}- Result:{result})scheduler.on_task_started.append(on_task_started)scheduler.on_task_completed.append(on_task_completed)asyncdefmain():# 启动调度器awaitscheduler.start()# 提交任务task1_idawaitscheduler.submit_task(process_data,hello world,multiplier3,priorityTaskPriority.HIGH)task2_idawaitscheduler.submit_task(calculate_sum,[1,2,3,4,5],nameCalculate numbers sum)# 提交延迟任务fromdatetimeimportdatetime,timedelta future_timedatetime.now()timedelta(seconds5)task3_idawaitscheduler.submit_task(process_data,delayed task,schedule_typedelayed,execute_atfuture_time,nameDelayed processing)# 等待一段时间查看结果awaitasyncio.sleep(3)# 获取任务状态status1awaitscheduler.get_task_status(task1_id)status2awaitscheduler.get_task_status(task2_id)print(f\nTask 1 Status:{status1})print(fTask 2 Status:{status2})# 获取指标metricsscheduler.get_metrics()print(f\nScheduler Metrics:{metrics})# 停止调度器awaitscheduler.stop()asyncio.run(main())4. WebSocket服务器深度实现 {#websocket服务器}4.1 WebSocket服务器核心 高性能WebSocket服务器实现 支持连接管理、房间系统、广播、心跳检测、消息压缩 importasyncioimportjsonimportzlibimporttimeimportuuidimporthashlibimportbase64fromtypingimportAny,Dict,List,Optional,Set,Callable,Unionfromdataclassesimportdataclass,fieldfromenumimportEnumfromcollectionsimportdefaultdictimportloggingimportstructimportsslfromdatetimeimportdatetime,timedeltaimportwebsocketsfromwebsockets.serverimportWebSocketServerProtocolfromwebsockets.exceptionsimportConnectionClosedclassConnectionState(Enum):连接状态CONNECTINGconnectingCONNECTEDconnectedDISCONNECTINGdisconnectingDISCONNECTEDdisconnectedclassMessageType(Enum):消息类型TEXT1BINARY2PING9PONG10CLOSE8dataclassclassWebSocketClient:WebSocket客户端id:strwebsocket:WebSocketServerProtocol state:ConnectionStateConnectionState.CONNECTING connected_at:datetimefield(default_factorydatetime.now)last_activity:datetimefield(default_factorydatetime.now)# 客户端信息remote_address:Optional[str]Noneuser_agent:Optional[str]Noneheaders:Dict[str,str]field(default_factorydict)# 会话数据session:Dict[str,Any]field(default_factorydict)subscriptions:Set[str]field(default_factoryset)# 订阅的主题/房间# 消息统计messages_sent:int0messages_received:int0bytes_sent:int0bytes_received:int0# 心跳last_ping:Optional[datetime]Noneping_count:int0defupdate_activity(self):更新活动时间self.last_activitydatetime.now()defis_active(self,timeout:int30)-bool:检查客户端是否活跃inactive_for(datetime.now()-self.last_activity).total_seconds()returninactive_fortimeoutdefsubscribe(self,topic:str):订阅主题self.subscriptions.add(topic)defunsubscribe(self,topic:str):取消订阅self.subscriptions.discard(topic)defis_subscribed(self,topic:str)-bool:检查是否订阅了主题returntopicinself.subscriptionsclassWebSocketMessage:WebSocket消息封装def__init__(self,type:MessageTypeMessageType.TEXT,data:AnyNone,compressed:boolFalse,metadata:Optional[Dict[str,Any]]None):self.idstr(uuid.uuid4())self.typetypeself.datadata self.compressedcompressed self.metadatametadataor{}self.timestampdatetime.now()self.sender:Optional[WebSocketClient]Nonedefto_dict(self)-Dict[str,Any]:转换为字典return{id:self.id,type:self.type.value,data:self.data,compressed:self.compressed,metadata:self.metadata,timestamp:self.timestamp.isoformat(),}defto_json(self)-str:转换为JSON字符串returnjson.dumps(self.to_dict(),defaultstr)classmethoddeffrom_json(cls,json_str:str)-WebSocketMessage:从JSON创建消息datajson.loads(json_str)returncls(typeMessageType(data[type]),datadata[data],compresseddata.get(compressed,False),metadatadata.get(metadata,{}))defcompress(self,level:int6)-bytes:压缩消息ifself.compressed:returnself.dataifisinstance(self.data,bytes)elseself.data.encode()json_strself.to_json()compressedzlib.compress(json_str.encode(),levellevel)self.compressedTrueself.datacompressedreturncompresseddefdecompress(self)-Dict[str,Any]:解压消息ifnotself.compressed:returnself.to_dict()ifisinstance(self.data,str):self.dataself.data.encode()decompressedzlib.decompress(self.data).decode()datajson.loads(decompressed)self.compressedFalseself.datadata.get(data)self.metadatadata.get(metadata,{})returndataclassWebSocketServer:WebSocket服务器def__init__(self,host:strlocalhost,port:int8765,ssl_context:Optional[ssl.SSLContext]None,max_connections:int10000,ping_interval:int30,ping_timeout:int10,message_size_limit:int16*1024*1024,# 16MBenable_compression:boolTrue,enable_stats:boolTrue):self.hosthost self.portport self.ssl_contextssl_context self.max_connectionsmax_connections self.ping_intervalping_interval self.ping_timeoutping_timeout# 连接管理self.clients:Dict[str,WebSocketClient]{}self.rooms:Dict[str,Set[str]]defaultdict(set)# room - client_idsself.topic_subscribers:Dict[str,Set[str]]defaultdict(set)# topic - client_ids# 消息处理器self.message_handlers:Dict[str,Callable]{}self.default_handler:Optional[Callable]None# 服务器状态self.is_runningFalseself.server:Optional[websockets.Server]Noneself.stats{connections_total:0,connections_active:0,messages_sent:0,messages_received:0,bytes_sent:0,bytes_received:0,start_time:None,uptime:0,}ifenable_statselseNone# 心跳任务self.heartbeat_task:Optional[asyncio.Task]None# 事件回调self.on_connect_callbacks:List[Callable][]self.on_disconnect_callbacks:List[Callable][]self.on_message_callbacks:List[Callable][]self.loggerlogging.getLogger(websocket_server)# 配置WebSocket服务器self.websocket_server_kwargs{max_size:message_size_limit,ping_interval:ping_interval,ping_timeout:ping_timeout,compression:deflateifenable_compressionelseNone,}asyncdefstart(self):启动WebSocket服务器ifself.is_running:returntry:self.serverawaitwebsockets.serve(self._handle_connection,self.host,self.port,sslself.ssl_context,**self.websocket_server_kwargs)self.is_runningTrueself.stats[start_time]datetime.now()# 启动心跳任务self.heartbeat_taskasyncio.create_task(self._heartbeat_loop())self.logger.info(fWebSocket server started on{self.host}:{self.port})exceptExceptionase:self.logger.error(fFailed to start WebSocket server:{e})raiseasyncdefstop(self):停止WebSocket服务器ifnotself.is_running:returnself.is_runningFalse# 停止心跳任务ifself.heartbeat_task:self.heartbeat_task.cancel()try:awaitself.heartbeat_taskexceptasyncio.CancelledError:pass# 关闭所有客户端连接disconnect_tasks[]forclient_id,clientinlist(self.clients.items()):disconnect_tasks.append(self.disconnect_client(client_id))ifdisconnect_tasks:awaitasyncio.gather(*disconnect_tasks,return_exceptionsTrue)# 关闭服务器ifself.server:self.server.close()awaitself.server.wait_closed()self.logger.info(WebSocket server stopped)asyncdef_handle_connection(self,websocket:WebSocketServerProtocol,path:str):处理新连接client_idself._generate_client_id(websocket)# 检查连接限制iflen(self.clients)self.max_connections:awaitwebsocket.close(1008,Server is at capacity)return# 创建客户端对象clientWebSocketClient(idclient_id,websocketwebsocket,remote_addressf{websocket.remote_address[0]}:{websocket.remote_address[1]},headersdict(websocket.request_headers),user_agentwebsocket.request_headers.get(User-Agent),)self.clients[client_id]client# 更新统计ifself.stats:self.stats[connections_total]1self.stats[connections_active]1# 触发连接事件awaitself._trigger_event(self.on_connect_callbacks,client)self.logger.info(fClient connected:{client_id}from{client.remote_address})try:# 处理消息循环awaitself._message_loop(client)exceptConnectionClosedase:self.logger.info(fClient disconnected:{client_id}-{e})exceptExceptionase:self.logger.error(fError handling client{client_id}:{e})finally:# 断开连接处理awaitself.disconnect_client(client_id)asyncdef_message_loop(self,client:WebSocketClient):处理客户端消息循环client.stateConnectionState.CONNECTEDasyncformessageinclient.websocket:client.update_activity()# 更新统计client.messages_received1client.bytes_receivedlen(message)ifself.stats:self.stats[messages_received]1self.stats[bytes_received]len(message)try:# 处理消息awaitself._process_message(client,message)exceptExceptionase:self.logger.error(fError processing message from{client.id}:{e})# 发送错误响应error_msgWebSocketMessage(typeMessageType.TEXT,data{error:str(e),type:processing_error},metadata{timestamp:datetime.now().isoformat()})awaitself.send_to_client(client.id,error_msg)asyncdef_process_message(self,client:WebSocketClient,raw_message:Union[str,bytes]):处理单个消息ws_messageNonetry:# 解析消息ifisinstance(raw_message,str):# JSON消息datajson.loads(raw_message)# 检查消息格式ifisinstance(data,dict)andtypeindata:# 结构化消息message_typedata.get(type,message)message_datadata.get(data,{})metadatadata.get(metadata,{})ws_messageWebSocketMessage(typeMessageType.TEXT,datamessage_data,metadata{**metadata,message_type:message_type,client_id:client.id,})else:# 普通文本消息ws_messageWebSocketMessage(typeMessageType.TEXT,datadata,metadata{client_id:client.id,raw:True})elifisinstance(raw_message,bytes):# 二进制消息ws_messageWebSocketMessage(typeMessageType.BINARY,dataraw_message,metadata{client_id:client.id})ifws_message:ws_message.senderclient# 触发消息事件awaitself._trigger_event(self.on_message_callbacks,client,ws_message)# 调用消息处理器message_typews_message.metadata.get(message_type,message)handlerself.message_handlers.get(message_type,self.default_handler)ifhandler:ifasyncio.iscoroutinefunction(handler):awaithandler(client,ws_message)else:handler(client,ws_message)exceptjson.JSONDecodeErrorase:self.logger.warning(fInvalid JSON from client{client.id}:{e})# 发送错误响应error_msgWebSocketMessage(typeMessageType.TEXT,data{error:Invalid JSON format,type:validation_error},metadata{timestamp:datetime.now().isoformat()})awaitself.send_to_client(client.id,error_msg)exceptExceptionase:self.logger.error(fError in message processing pipeline:{e})raiseasyncdef_heartbeat_loop(self):心跳检测循环whileself.is_running:try:awaitasyncio.sleep(self.ping_interval)# 检查所有客户端的心跳nowdatetime.now()clients_to_disconnect[]forclient_id,clientinlist(self.clients.items()):# 检查活动状态ifnotclient.is_active(self.ping_interval*2):self.logger.warning(fClient{client_id}inactive, disconnecting)clients_to_disconnect.append(client_id)continue# 发送pingtry:ifclient.websocket.open:# 记录ping时间client.last_pingnow client.ping_count1# 发送pingawaitclient.websocket.ping()exceptConnectionClosed:clients_to_disconnect.append(client_id)exceptExceptionase:self.logger.warning(fError sending ping to{client_id}:{e})clients_to_disconnect.append(client_id)# 断开不活动的客户端forclient_idinclients_to_disconnect:awaitself.disconnect_client(client_id)# 更新统计ifself.stats:self.stats[uptime](datetime.now()-self.stats[start_time]).total_seconds()self.stats[connections_active]len(self.clients)exceptasyncio.CancelledError:breakexceptExceptionase:self.logger.error(fHeartbeat loop error:{e})asyncdefdisconnect_client(self,client_id:str,code:int1000,reason:str):断开客户端连接clientself.clients.get(client_id)ifnotclient:return# 更新状态client.stateConnectionState.DISCONNECTING# 从所有房间和订阅中移除forroominlist(client.subscriptions):self.leave_room(client_id,room)fortopicinlist(client.subscriptions):self.unsubscribe(client_id,topic)# 关闭WebSocket连接try:ifclient.websocket.open:awaitclient.websocket.close(codecode,reasonreason)exceptExceptionase:self.logger.debug(fError closing connection for{client_id}:{e})# 从客户端列表移除self.clients.pop(client_id,None)# 更新统计ifself.stats:self.stats[connections_active]len(self.clients)# 触发断开连接事件awaitself._trigger_event(self.on_disconnect_callbacks,client)self.logger.info(fClient disconnected:{client_id})asyncdefsend_to_client(self,client_id:str,message:WebSocketMessage)-bool:发送消息给指定客户端clientself.clients.get(client_id)ifnotclientornotclient.websocket.open:returnFalsetry:# 准备消息数据ifmessage.typeMessageType.TEXT:ifisinstance(message.data,(dict,list)):datajson.dumps(message.to_dict())else:datastr(message.data)else:datamessage.data# 发送消息awaitclient.websocket.send(data)# 更新统计client.messages_sent1client.bytes_sentlen(data)ifisinstance(data,str)elselen(data)ifself.stats:self.stats[messages_sent]1self.stats[bytes_sent]len(data)ifisinstance(data,str)elselen(data)returnTrueexceptConnectionClosed:awaitself.disconnect_client(client_id)returnFalseexceptExceptionase:self.logger.error(fError sending to client{client_id}:{e})returnFalseasyncdefbroadcast(self,message:WebSocketMessage,room:Optional[str]None):广播消息ifroom:# 广播到房间client_idsself.rooms.get(room,set()).copy()else:# 广播到所有客户端client_idslist(self.clients.keys())# 并发发送send_tasks[]forclient_idinclient_ids:taskself.send_to_client(client_id,message)send_tasks.append(task)ifsend_tasks:resultsawaitasyncio.gather(*send_tasks,return_exceptionsTrue)# 统计发送结果success_countsum(1forrinresultsifrisTrue)returnsuccess_countreturn0asyncdefsend_to_topic(self,topic:str,message:WebSocketMessage)-int:发送消息到主题client_idsself.topic_subscribers.get(topic,set()).copy()send_tasks[]forclient_idinclient_ids:taskself.send_to_client(client_id,message)send_tasks.append(task)ifsend_tasks:resultsawaitasyncio.gather(*send_tasks,return_exceptionsTrue)success_countsum(1forrinresultsifrisTrue)returnsuccess_countreturn0defjoin_room(self,client_id:str,room:str):客户端加入房间clientself.clients.get(client_id)ifnotclient:returnFalseself.rooms[room].add(client_id)client.subscribe(room)returnTruedefleave_room(self,client_id:str,room:str):客户端离开房间clientself.clients.get(client_id)ifnotclient:returnFalseroom_clientsself.rooms.get(room)ifroom_clients:room_clients.discard(client_id)ifnotroom_clients:delself.rooms[room]client.unsubscribe(room)returnTruedefsubscribe(self,client_id:str,topic:str):客户端订阅主题clientself.clients.get(client_id)ifnotclient:returnFalseself.topic_subscribers[topic].add(client_id)client.subscribe(topic)returnTruedefunsubscribe(self,client_id:str,topic:str):客户端取消订阅clientself.clients.get(client_id)ifnotclient:returnFalsesubscribersself.topic_subscribers.get(topic)ifsubscribers:subscribers.discard(client_id)ifnotsubscribers:delself.topic_subscribers[topic]client.unsubscribe(topic)returnTruedefregister_handler(self,message_type:str,handler:Callable):注册消息处理器self.message_handlers[message_type]handlerdefset_default_handler(self,handler:Callable):设置默认消息处理器self.default_handlerhandlerdefget_client(self,client_id:str)-Optional[WebSocketClient]:获取客户端信息returnself.clients.get(client_id)defget_room_clients(self,room:str)-List[WebSocketClient]:获取房间内的客户端client_idsself.rooms.get(room,set())return[self.clients[cid]forcidinclient_idsifcidinself.clients]defget_topic_subscribers(self,topic:str)-List[WebSocketClient]:获取主题订阅者client_idsself.topic_subscribers.get(topic,set())return[self.clients[cid]forcidinclient_idsifcidinself.clients]defget_stats(self)-Dict[str,Any]:获取服务器统计信息ifnotself.stats:return{}statsself.stats.copy()stats[clients_total]len(self.clients)stats[rooms_total]len(self.rooms)stats[topics_total]len(self.topic_subscribers)# 计算连接信息connections_by_time{}nowdatetime.now()forclientinself.clients.values():connected_minutesint((now-client.connected_at).total_seconds()/60)bucketf{connected_minutes//5*5}-{(connected_minutes//51)*5}minconnections_by_time[bucket]connections_by_time.get(bucket,0)1stats[connections_by_duration]connections_by_timereturnstatsasyncdef_trigger_event(self,callbacks:List[Callable],*args,**kwargs):触发事件回调forcallbackincallbacks:try:ifasyncio.iscoroutinefunction(callback):awaitcallback(*args,**kwargs)else:callback(*args,**kwargs)exceptExceptionase:self.logger.error(fEvent callback error:{e})def_generate_client_id(self,websocket:WebSocketServerProtocol)-str:生成客户端ID# 基于连接信息生成唯一IDremote_addrwebsocket.remote_address timestampint(time.time()*1000)id_stringf{remote_addr[0]}:{remote_addr[1]}:{timestamp}returnhashlib.md5(id_string.encode()).hexdigest()[:12]# 使用示例asyncdefexample_usage():WebSocket服务器使用示例# 创建服务器serverWebSocketServer(hostlocalhost,port8765,max_connections1000,ping_interval20,enable_compressionTrue)# 注册事件处理器asyncdefon_client_connect(client:WebSocketClient):print(fNew client connected:{client.id})# 发送欢迎消息welcome_msgWebSocketMessage(typeMessageType.TEXT,data{type:welcome,message:Welcome to the WebSocket server!,client_id:client.id,timestamp:datetime.now().isoformat()})awaitserver.send_to_client(client.id,welcome_msg)asyncdefon_client_disconnect(client:WebSocketClient):print(fClient disconnected:{client.id})asyncdefon_message_received(client:WebSocketClient,message:WebSocketMessage):print(fMessage from{client.id}:{message.data})# 回显消息echo_msgWebSocketMessage(typeMessageType.TEXT,data{type:echo,original:message.data,timestamp:datetime.now().isoformat()})awaitserver.send_to_client(client.id,echo_msg)# 注册消息处理器asyncdefhandle_chat_message(client:WebSocketClient,message:WebSocketMessage):处理聊天消息chat_datamessage.data roomchat_data.get(room,general)# 广播到房间broadcast_msgWebSocketMessage(typeMessageType.TEXT,data{type:chat_message,sender:client.id,room:room,message:chat_data.get(message),timestamp:datetime.now().isoformat()})awaitserver.broadcast(broadcast_msg,roomroom)# 设置处理器server.on_connect_callbacks.append(on_client_connect)server.on_disconnect_callbacks.append(on_client_disconnect)server.on_message_callbacks.append(on_message_received)server.register_handler(chat,handle_chat_message)# 启动服务器awaitserver.start()# 运行一段时间try:# 模拟后台任务发送消息asyncdefbackground_broadcaster():后台广播任务count0whileserver.is_running:awaitasyncio.sleep(10)count1broadcast_msgWebSocketMessage(typeMessageType.TEXT,data{type:system_notification,message:fSystem notification #{count},timestamp:datetime.now().isoformat()})sentawaitserver.broadcast(broadcast_msg)print(fBroadcasted system message to{sent}clients)# 启动后台任务broadcaster_taskasyncio.create_task(background_broadcaster())# 保持运行awaitasyncio.sleep(300)# 运行5分钟# 取消后台任务broadcaster_task.cancel()finally:# 停止服务器awaitserver.stop()if__name____main__:# 配置日志logging.basicConfig(levellogging.INFO)# 运行示例asyncio.run(example_usage())5. 前后端集成与通信协议 {#集成通信}5.1 通信协议设计 前后端通信协议 包含消息格式、错误处理、认证、状态同步 fromtypingimportDict,Any,Optional,List,UnionfromenumimportEnumfrompydanticimportBaseModel,Field,validatorimportjsonfromdatetimeimportdatetimeclassMessageType(str,Enum):消息类型枚举# 系统消息CONNECTconnectDISCONNECTdisconnectPINGpingPONGpongERRORerror# 认证消息AUTH_REQUESTauth_requestAUTH_RESPONSEauth_response# 数据消息DATA_UPDATEdata_updateDATA_REQUESTdata_requestDATA_RESPONSEdata_response# 控制消息SUBSCRIBEsubscribeUNSUBSCRIBEunsubscribeCOMMANDcommand# 实时消息CHAT_MESSAGEchat_messageNOTIFICATIONnotificationPRESENCEpresenceclassErrorCode(str,Enum):错误代码枚举# 系统错误INTERNAL_ERRORinternal_errorTIMEOUTtimeoutRATE_LIMITrate_limit# 认证错误UNAUTHORIZEDunauthorizedINVALID_TOKENinvalid_tokenPERMISSION_DENIEDpermission_denied# 数据错误VALIDATION_ERRORvalidation_errorNOT_FOUNDnot_foundCONFLICTconflict# 连接错误CONNECTION_CLOSEDconnection_closedPROTOCOL_ERRORprotocol_errorclassMessage(BaseModel):基础消息模型id:strField(default_factorylambda:str(uuid.uuid4()))type:MessageType data:Optional[Dict[str,Any]]Nonemetadata:Dict[str,Any]Field(default_factorydict)timestamp:datetimeField(default_factorydatetime.now)classConfig:use_enum_valuesTruejson_encoders{datetime:lambdadt:dt.isoformat()}defto_dict(self)-Dict[str,Any]:转换为字典returnjson.loads(self.json())classmethoddeffrom_dict(cls,data:Dict[str,Any])-Message:从字典创建消息returncls(**data)classAuthMessage(Message):认证消息token:Optional[str]Noneuser_id:Optional[str]Nonepermissions:List[str]Field(default_factorylist)validator(type)defvalidate_type(cls,v):allowed[MessageType.AUTH_REQUEST,MessageType.AUTH_RESPONSE]ifvnotinallowed:raiseValueError(fAuth message type must be one of{allowed})returnvclassDataMessage(Message):数据消息collection:Optional[str]Noneoperation:Optional[str]None# create, read, update, deletequery:Optional[Dict[str,Any]]Nonepayload:Optional[Dict[str,Any]]Nonevalidator(type)defvalidate_type(cls,v):allowed[MessageType.DATA_REQUEST,MessageType.DATA_RESPONSE,MessageType.DATA_UPDATE]ifvnotinallowed:raiseValueError(fData message type must be one of{allowed})returnvclassCommandMessage(Message):命令消息command:strparameters:Dict[str,Any]Field(default_factorydict)expect_response:boolTruevalidator(type)defvalidate_type(cls,v):ifv!MessageType.COMMAND:raiseValueError(Command message type must be command)returnvclassErrorMessage(Message):错误消息error_code:ErrorCode error_message:strdetails:Optional[Dict[str,Any]]Nonevalidator(type)defvalidate_type(cls,v):ifv!MessageType.ERROR:raiseValueError(Error message type must be error)returnvclassmethoddeffrom_exception(cls,exc:Exception,error_code:ErrorCodeErrorCode.INTERNAL_ERROR):从异常创建错误消息returncls(error_codeerror_code,error_messagestr(exc),details{exception_type:exc.__class__.__name__})classPresenceMessage(Message):在线状态消息user_id:strstatus:str# online, away, offline, busylast_seen:Optional[datetime]Nonecustom_status:Optional[str]Nonevalidator(type)defvalidate_type(cls,v):ifv!MessageType.PRESENCE:raiseValueError(Presence message type must be presence)returnvclassSubscriptionMessage(Message):订阅消息topics:List[str]unsubscribe:boolFalsevalidator(type)defvalidate_type(cls,v):allowed[MessageType.SUBSCRIBE,MessageType.UNSUBSCRIBE]ifvnotinallowed:raiseValueError(fSubscription message type must be one of{allowed})returnvclassMessageFactory:消息工厂类staticmethoddefcreate_connect_message(user_id:strNone)-Message:创建连接消息returnMessage(typeMessageType.CONNECT,data{user_id:user_id}ifuser_idelse{})staticmethoddefcreate_ping_message()-Message:创建ping消息returnMessage(typeMessageType.PING)staticmethoddefcreate_pong_message()-Message:创建pong消息returnMessage(typeMessageType.PONG)staticmethoddefcreate_auth_request(token:str)-AuthMessage:创建认证请求returnAuthMessage(typeMessageType.AUTH_REQUEST,tokentoken)staticmethoddefcreate_auth_response(success:bool,user_id:strNone,permissions:List[str]None)-AuthMessage:创建认证响应returnAuthMessage(typeMessageType.AUTH_RESPONSE,data{success:success},user_iduser_id,permissionspermissionsor[])staticmethoddefcreate_data_request(collection:str,operation:str,query:Dict[str,Any]None,payload:Dict[str,Any]None)-DataMessage:创建数据请求returnDataMessage(typeMessageType.DATA_REQUEST,collectioncollection,operationoperation,queryquery,payloadpayload)staticmethoddefcreate_data_response(data:Any,collection:strNone,operation:strNone)-DataMessage:创建数据响应returnDataMessage(typeMessageType.DATA_RESPONSE,data{result:data},collectioncollection,operationoperation)staticmethoddefcreate_error(error_code:ErrorCode,error_message:str,details:Dict[str,Any]None)-ErrorMessage:创建错误消息returnErrorMessage(typeMessageType.ERROR,error_codeerror_code,error_messageerror_message,detailsdetails)staticmethoddefcreate_command(command:str,parameters:Dict[str,Any]None,expect_response:boolTrue)-CommandMessage:创建命令消息returnCommandMessage(typeMessageType.COMMAND,commandcommand,parametersparametersor{},expect_responseexpect_response)staticmethoddefcreate_subscription(topics:List[str],unsubscribe:boolFalse)-SubscriptionMessage:创建订阅消息message_typeMessageType.UNSUBSCRIBEifunsubscribeelseMessageType.SUBSCRIBEreturnSubscriptionMessage(typemessage_type,topicstopics)staticmethoddefcreate_presence(user_id:str,status:str,custom_status:strNone)-PresenceMessage:创建在线状态消息returnPresenceMessage(typeMessageType.PRESENCE,user_iduser_id,statusstatus,custom_statuscustom_status,last_seendatetime.now())classMessageRouter:消息路由器def__init__(self):self.handlers:Dict[str,List[Callable]]defaultdict(list)self.middleware:List[Callable][]defregister_handler(self,message_type:str,handler:Callable):注册消息处理器self.handlers[message_type].append(handler)defregister_middleware(self,middleware:Callable):注册中间件self.middleware.append(middleware)asyncdefroute_message(self,client_id:str,message:Message)-Optional[Message]:路由消息responseNonetry:# 执行中间件formiddlewareinself.middleware:resultawaitmiddleware(client_id,message)ifisinstance(result,Message):# 中间件返回了响应returnresultelifresultisFalse:# 中间件拒绝处理returnMessageFactory.create_error(ErrorCode.UNAUTHORIZED,Message rejected by middleware)# 查找处理器handlersself.handlers.get(message.type,[])ifnothandlers:# 没有处理器返回错误returnMessageFactory.create_error(ErrorCode.PROTOCOL_ERROR,fNo handler for message type:{message.type})# 执行处理器forhandlerinhandlers:resultawaithandler(client_id,message)ifisinstance(result,Message):responseresultbreakexceptExceptionase:# 处理异常responseMessageFactory.create_error(ErrorCode.INTERNAL_ERROR,str(e),{exception:e.__class__.__name__})returnresponse6. 完整实战案例实时股票监控系统 {#实战案例} 实时股票监控系统 功能实时股票数据推送、价格预警、技术指标计算、交易信号生成 importasyncioimportjsonimporttimefromdatetimeimportdatetime,timedeltafromtypingimportDict,List,Optional,Setimportrandomimportloggingfromdataclassesimportdataclass,fieldfromcollectionsimportdefaultdict# 导入之前定义的组件fromtask_schedulerimportTaskScheduler,Task,TaskPriorityfromwebsocket_serverimportWebSocketServer,WebSocketMessage,MessageTypefrommessage_protocolimport(MessageFactory,MessageRouter,MessageTypeasMsgType,ErrorCode,SubscriptionMessage)dataclassclassStockData:股票数据symbol:strprice:floatvolume:inttimestamp:datetime change:float0.0# 涨跌幅change_percent:float0.0open:float0.0high:float0.0low:float0.0close:float0.0bid:float0.0ask:float0.0bid_size:int0ask_size:int0defto_dict(self)-Dict[str,Any]:转换为字典return{symbol:self.symbol,price:self.price,volume:self.volume,timestamp:self.timestamp.isoformat(),change:self.change,change_percent:self.change_percent,open:self.open,high:self.high,low:self.low,close:self.close,bid:self.bid,ask:self.ask,bid_size:self.bid_size,ask_size:self.ask_size,}dataclassclassAlert:价格预警id:strsymbol:strcondition:str# above, below, cross_above, cross_belowprice:floatuser_id:strcreated_at:datetimefield(default_factorydatetime.now)triggered_at:Optional[datetime]Nonetriggered_price:Optional[float]Noneactive:boolTruedefcheck(self,current_price:float,previous_price:floatNone)-bool:检查预警是否触发ifnotself.active:returnFalseifself.conditionabove:returncurrent_priceself.priceelifself.conditionbelow:returncurrent_priceself.priceelifself.conditioncross_aboveandprevious_price:returnprevious_priceself.pricecurrent_priceelifself.conditioncross_belowandprevious_price:returnprevious_priceself.pricecurrent_pricereturnFalseclassTechnicalIndicator:技术指标计算器staticmethoddefcalculate_sma(prices:List[float],period:int)-Optional[float]:计算简单移动平均线iflen(prices)period:returnNonereturnsum(prices[-period:])/periodstaticmethoddefcalculate_ema(prices:List[float],period:int)-Optional[float]:计算指数移动平均线iflen(prices)period:returnNonesmaTechnicalIndicator.calculate_sma(prices[:period],period)ifsmaisNone:returnNonemultiplier2/(period1)emasmaforpriceinprices[period:]:ema(price-ema)*multiplieremareturnemastaticmethoddefcalculate_rsi(prices:List[float],period:int14)-Optional[float]:计算相对强弱指数iflen(prices)period:returnNonegains[]losses[]foriinrange(1,len(prices)):changeprices[i]-prices[i-1]ifchange0:gains.append(change)losses.append(0)else:gains.append(0)losses.append(abs(change))avg_gainsum(gains[:period])/period avg_losssum(losses[:period])/periodforiinrange(period,len(gains)):avg_gain(avg_gain*(period-1)gains[i])/period avg_loss(avg_loss*(period-1)losses[i])/periodifavg_loss0:return100rsavg_gain/avg_loss rsi100-(100/(1rs))returnrsistaticmethoddefcalculate_macd(prices:List[float],fast_period:int12,slow_period:int26,signal_period:int9)-Dict[str,Optional[float]]:计算MACDiflen(prices)slow_period:return{macd:None,signal:None,histogram:None}fast_emaTechnicalIndicator.calculate_ema(prices,fast_period)slow_emaTechnicalIndicator.calculate_ema(prices,slow_period)iffast_emaisNoneorslow_emaisNone:return{macd:None,signal:None,histogram:None}macdfast_ema-slow_ema# 计算信号线macd_values[]foriinrange(len(prices)-slow_period1):fastTechnicalIndicator.calculate_ema(prices[i:],fast_period)slowTechnicalIndicator.calculate_ema(prices[i:],slow_period)iffastisnotNoneandslowisnotNone:macd_values.append(fast-slow)signalTechnicalIndicator.calculate_ema(macd_values,signal_period)histogrammacd-signalifsignalisnotNoneelseNonereturn{macd:macd,signal:signal,histogram:histogram}classStockMarketSimulator:股票市场模拟器def__init__(self):self.stocks:Dict[str,StockData]{}self.price_history:Dict[str,List[StockData]]defaultdict(list)self.initial_prices:Dict[str,float]{}# 初始化一些股票self._initialize_stocks()def_initialize_stocks(self):初始化股票数据stocks_config[(AAPL,175.0,0.02),(GOOGL,135.0,0.015),(MSFT,330.0,0.018),(TSLA,240.0,0.025),(AMZN,145.0,0.022),(META,320.0,0.02),(NVDA,480.0,0.03),(NFLX,560.0,0.019),(AMD,125.0,0.021),(INTC,44.0,0.012),]forsymbol,price,volatilityinstocks_config:self.initial_prices[symbol]price stockStockData(symbolsymbol,priceprice,volumerandom.randint(1000,10000),timestampdatetime.now(),openprice,highprice,lowprice,closeprice,bidprice*0.999,askprice*1.001,bid_sizerandom.randint(100,1000),ask_sizerandom.randint(100,1000))self.stocks[symbol]stock self.price_history[symbol].append(stock)defupdate_prices(self):更新股票价格模拟市场波动forsymbol,stockinself.stocks.items():previous_pricestock.price# 随机波动volatility0.02# 2%波动change_percentrandom.uniform(-volatility,volatility)# 趋势成分轻微trendrandom.uniform(-0.001,0.001)# 随机事件偶尔大幅波动event0.0ifrandom.random()0.01:# 1%概率eventrandom.uniform(-0.05,0.05)total_changechange_percenttrendevent# 更新价格new_priceprevious_price*(1total_change)# 更新股票数据stock.priceround(new_price,2)stock.changeround(new_price-previous_price,2)stock.change_percentround((new_price-previous_price)/previous_price*100,2)stock.volumerandom.randint(1000,50000)stock.timestampdatetime.now()# 更新最高价/最低价stock.highmax(stock.high,stock.price)stock.lowmin(stock.low,stock.price)# 更新买卖价stock.bidround(stock.price*0.999,2)stock.askround(stock.price*1.001,2)stock.bid_sizerandom.randint(100,1000)stock.ask_sizerandom.randint(100,1000)# 保存历史self.price_history[symbol].append(stock)# 保持历史数据长度iflen(self.price_history[symbol])1000:self.price_history[symbol].pop(0)defget_stock(self,symbol:str)-Optional[StockData]:获取股票数据returnself.stocks.get(symbol)defget_all_stocks(self)-List[StockData]:获取所有股票数据returnlist(self.stocks.values())defget_price_history(self,symbol:str,limit:int100)-List[StockData]:获取价格历史historyself.price_history.get(symbol,[])returnhistory[-limit:]iflimitelsehistorydefcalculate_indicators(self,symbol:str)-Dict[str,Any]:计算技术指标historyself.price_history.get(symbol,[])ifnothistory:return{}prices[data.pricefordatainhistory]return{sma_20:TechnicalIndicator.calculate_sma(prices,20),sma_50:TechnicalIndicator.calculate_sma(prices,50),ema_12:TechnicalIndicator.calculate_ema(prices,12),ema_26:TechnicalIndicator.calculate_ema(prices,26),rsi_14:TechnicalIndicator.calculate_rsi(prices,14),macd:TechnicalIndicator.calculate_macd(prices),}classRealTimeStockMonitor:实时股票监控系统def__init__(self,websocket_port:int8765,redis_url:strredis://localhost:6379):# 初始化组件self.market_simulatorStockMarketSimulator()self.task_schedulerTaskScheduler(max_workers10)self.websocket_serverWebSocketServer(portwebsocket_port)self.message_routerMessageRouter()# 预警系统self.alerts:Dict[str,List[Alert]]defaultdict(list)self.triggered_alerts:Dict[str,List[Alert]]defaultdict(list)# 客户端订阅self.client_subscriptions:Dict[str,Set[str]]defaultdict(set)# client_id - symbols# 状态追踪self.is_runningFalseself.loggerlogging.getLogger(stock_monitor)# 注册任务和处理器self._register_tasks()self._register_message_handlers()self._setup_websocket_handlers()def_register_tasks(self):注册后台任务self.task_scheduler.task(nameupdate_market_data)asyncdefupdate_market_data_task():更新市场数据任务self.market_simulator.update_prices()# 获取所有股票数据stocksself.market_simulator.get_all_stocks()# 检查预警awaitself._check_alerts()# 广播更新awaitself._broadcast_stock_updates(stocks)return{stocks_updated:len(stocks)}self.task_scheduler.task(namecalculate_indicators)asyncdefcalculate_indicators_task():计算技术指标任务indicators{}forsymbolinself.market_simulator.stocks.keys():indicators[symbol]self.market_simulator.calculate_indicators(symbol)# 广播技术指标awaitself._broadcast_indicators(indicators)return{symbols_processed:len(indicators)}self.task_scheduler.task(namecleanup_old_data)asyncdefcleanup_old_data_task():清理旧数据任务# 清理过期的预警awaitself._cleanup_expired_alerts()return{cleaned:True}def_register_message_handlers(self):注册消息处理器asyncdefhandle_subscription(client_id:str,message:SubscriptionMessage):处理订阅消息ifmessage.unsubscribe:# 取消订阅fortopicinmessage.topics:iftopicinself.client_subscriptions[client_id]:self.client_subscriptions[client_id].remove(topic)self.websocket_server.unsubscribe(client_id,topic)else:# 订阅fortopicinmessage.topics:self.client_subscriptions[client_id].add(topic)self.websocket_server.subscribe(client_id,topic)# 发送当前数据iftopic.startswith(stock:):symboltopic.replace(stock:,)stockself.market_simulator.get_stock(symbol)ifstock:awaitself._send_stock_data(client_id,symbol,stock)returnMessageFactory.create_data_response({subscribed:message.topics,unsubscribe:message.unsubscribe})asyncdefhandle_data_request(client_id:str,message:DataMessage):处理数据请求collectionmessage.collection operationmessage.operation querymessage.queryor{}ifcollectionstocks:ifoperationlist:# 获取股票列表stocksself.market_simulator.get_all_stocks()stocks_data[stock.to_dict()forstockinstocks]returnMessageFactory.create_data_response(stocks_data)elifoperationget:# 获取单个股票symbolquery.get(symbol)ifnotsymbol:returnMessageFactory.create_error(ErrorCode.VALIDATION_ERROR,Symbol is required)stockself.market_simulator.get_stock(symbol)ifnotstock:returnMessageFactory.create_error(ErrorCode.NOT_FOUND,fStock{symbol}not found)returnMessageFactory.create_data_response(stock.to_dict())elifoperationhistory:# 获取历史数据symbolquery.get(symbol)limitquery.get(limit,100)ifnotsymbol:returnMessageFactory.create_error(ErrorCode.VALIDATION_ERROR,Symbol is required)historyself.market_simulator.get_price_history(symbol,limit)history_data[data.to_dict()fordatainhistory]returnMessageFactory.create_data_response(history_data)elifcollectionalerts:ifoperationlist:# 获取用户预警user_idquery.get(user_id)alertsself._get_user_alerts(user_id)ifuser_idelse[]alerts_data[{id:alert.id,symbol:alert.symbol,condition:alert.condition,price:alert.price,active:alert.active,created_at:alert.created_at.isoformat(),triggered_at:alert.triggered_at.isoformat()ifalert.triggered_atelseNone,triggered_price:alert.triggered_price}foralertinalerts]returnMessageFactory.create_data_response(alerts_data)elifoperationcreate:# 创建预警symbolquery.get(symbol)conditionquery.get(condition)pricequery.get(price)user_idquery.get(user_id)ifnotall([symbol,condition,price,user_id]):returnMessageFactory.create_error(ErrorCode.VALIDATION_ERROR,Missing required fields)alertself._create_alert(symbol,condition,price,user_id)returnMessageFactory.create_data_response({id:alert.id,message:Alert created successfully})elifoperationdelete:# 删除预警alert_idquery.get(id)user_idquery.get(user_id)ifnotalert_idornotuser_id:returnMessageFactory.create_error(ErrorCode.VALIDATION_ERROR,Alert ID and user ID are required)successself._delete_alert(alert_id,user_id)returnMessageFactory.create_data_response({success:success,message:Alert deletedifsuccesselseAlert not found})returnMessageFactory.create_error(ErrorCode.NOT_FOUND,fUnknown collection or operation:{collection}/{operation})asyncdefhandle_command(client_id:str,message:CommandMessage):处理命令消息commandmessage.command paramsmessage.parametersifcommandget_stats:# 获取系统统计stats{total_stocks:len(self.market_simulator.stocks),total_alerts:sum(len(alerts)foralertsinself.alerts.values()),active_connections:len(self.websocket_server.clients),server_uptime:time.time()-(self.websocket_server.stats.get(start_time,datetime.now()).timestamp()ifself.websocket_server.statselsetime.time()),}returnMessageFactory.create_data_response(stats)elifcommandsimulate_event:# 模拟市场事件symbolparams.get(symbol)impactparams.get(impact,0.1)# 10% impactifsymbolandsymbolinself.market_simulator.stocks:stockself.market_simulator.stocks[symbol]stock.price*(1impact)returnMessageFactory.create_data_response({message:fSimulated event for{symbol},new_price:stock.price})returnMessageFactory.create_error(ErrorCode.NOT_FOUND,fUnknown command:{command})# 注册处理器self.message_router.register_handler(MsgType.SUBSCRIBE,handle_subscription)self.message_router.register_handler(MsgType.UNSUBSCRIBE,handle_subscription)self.message_router.register_handler(MsgType.DATA_REQUEST,handle_data_request)self.message_router.register_handler(MsgType.COMMAND,handle_command)def_setup_websocket_handlers(self):设置WebSocket处理器asyncdefon_client_connect(client):客户端连接事件self.logger.info(fClient connected:{client.id})# 发送欢迎消息welcome_msgWebSocketMessage(typeMessageType.TEXT,data{type:welcome,client_id:client.id,message:Connected to Stock Monitor,timestamp:datetime.now().isoformat()})awaitself.websocket_server.send_to_client(client.id,welcome_msg)asyncdefon_client_disconnect(client):客户端断开连接事件self.logger.info(fClient disconnected:{client.id})# 清理客户端订阅ifclient.idinself.client_subscriptions:delself.client_subscriptions[client.id]asyncdefon_message_received(client,message:WebSocketMessage):消息接收事件try:# 解析消息ifisinstance(message.data,dict)andtypeinmessage.data:msg_typemessage.data.get(type)# 转换为协议消息protocol_messageMessage.from_dict(message.data)# 路由消息responseawaitself.message_router.route_message(client.id,protocol_message)ifresponse:# 发送响应response_msgWebSocketMessage(typeMessageType.TEXT,dataresponse.to_dict())awaitself.websocket_server.send_to_client(client.id,response_msg)exceptExceptionase:self.logger.error(fError processing message:{e})# 发送错误响应error_msgWebSocketMessage(typeMessageType.TEXT,data{type:error,error:str(e),timestamp:datetime.now().isoformat()})awaitself.websocket_server.send_to_client(client.id,error_msg)# 设置WebSocket事件处理器self.websocket_server.on_connect_callbacks.append(on_client_connect)self.websocket_server.on_disconnect_callbacks.append(on_client_disconnect)self.websocket_server.on_message_callbacks.append(on_message_received)self.websocket_server.set_default_handler(on_message_received)asyncdef_check_alerts(self):检查预警forsymbol,alertsinself.alerts.items():stockself.market_simulator.get_stock(symbol)ifnotstock:continue# 获取前一个价格historyself.market_simulator.get_price_history(symbol,2)previous_pricehistory[0].priceiflen(history)1elsestock.priceforalertinalerts[:]:# 使用切片创建副本ifalert.activeandalert.check(stock.price,previous_price):# 触发预警alert.activeFalsealert.triggered_atdatetime.now()alert.triggered_pricestock.price self.triggered_alerts[symbol].append(alert)# 发送预警通知awaitself._send_alert_notification(alert,stock)asyncdef_send_alert_notification(self,alert:Alert,stock:StockData):发送预警通知notification_msgWebSocketMessage(typeMessageType.TEXT,data{type:alert_triggered,alert_id:alert.id,symbol:alert.symbol,condition:alert.condition,target_price:alert.price,current_price:stock.price,triggered_at:alert.triggered_at.isoformat(),user_id:alert.user_id})# 发送给特定用户这里简化处理awaitself.websocket_server.broadcast(notification_msg)asyncdef_broadcast_stock_updates(self,stocks:List[StockData]):广播股票更新forstockinstocks:update_msgWebSocketMessage(typeMessageType.TEXT,data{type:stock_update,symbol:stock.symbol,data:stock.to_dict(),timestamp:datetime.now().isoformat()})# 广播到订阅该股票的客户端topicfstock:{stock.symbol}awaitself.websocket_server.send_to_topic(topic,update_msg)asyncdef_broadcast_indicators(self,indicators:Dict[str,Dict[str,Any]]):广播技术指标forsymbol,indicator_datainindicators.items():ifindicator_data:indicator_msgWebSocketMessage(typeMessageType.TEXT,data{type:indicators_update,symbol:symbol,indicators:indicator_data,timestamp:datetime.now().isoformat()})topicfindicators:{symbol}awaitself.websocket_server.send_to_topic(topic,indicator_msg)asyncdef_send_stock_data(self,client_id:str,symbol:str,stock:StockData):发送股票数据给客户端stock_msgWebSocketMessage(typeMessageType.TEXT,data{type:stock_data,symbol:symbol,data:stock.to_dict(),timestamp:datetime.now().isoformat()})awaitself.websocket_server.send_to_client(client_id,stock_msg)def_create_alert(self,symbol:str,condition:str,price:float,user_id:str)-Alert:创建预警alert_idstr(uuid.uuid4())alertAlert(idalert_id,symbolsymbol,conditioncondition,priceprice,user_iduser_id)self.alerts[symbol].append(alert)returnalertdef_delete_alert(self,alert_id:str,user_id:str)-bool:删除预警forsymbol,alertsinself.alerts.items():foralertinalerts:ifalert.idalert_idandalert.user_iduser_id:alerts.remove(alert)returnTruereturnFalsedef_get_user_alerts(self,user_id:str)-List[Alert]:获取用户预警user_alerts[]foralertsinself.alerts.values():foralertinalerts:ifalert.user_iduser_id:user_alerts.append(alert)returnuser_alertsasyncdef_cleanup_expired_alerts(self):清理过期预警# 这里可以添加清理逻辑例如删除30天前的已触发预警passasyncdefstart(self):启动监控系统self.is_runningTrue# 启动任务调度器awaitself.task_scheduler.start()# 启动WebSocket服务器awaitself.websocket_server.start()# 提交定期任务awaitself.task_scheduler.submit_task(update_market_data,schedule_typeinterval,interval_seconds1,# 每秒更新nameMarket Data Updater,priorityTaskPriority.HIGH)awaitself.task_scheduler.submit_task(calculate_indicators,schedule_typeinterval,interval_seconds5,# 每5秒计算nameIndicators Calculator,priorityTaskPriority.NORMAL)awaitself.task_scheduler.submit_task(cleanup_old_data,schedule_typeinterval,interval_seconds60,# 每分钟清理nameData Cleanup,priorityTaskPriority.LOW)self.logger.info(Real-time stock monitor started)asyncdefstop(self):停止监控系统self.is_runningFalse# 停止任务调度器awaitself.task_scheduler.stop()# 停止WebSocket服务器awaitself.websocket_server.stop()self.logger.info(Real-time stock monitor stopped)# 客户端示例代码JavaScript/HTMLCLIENT_HTML !DOCTYPE html html langen head meta charsetUTF-8 meta nameviewport contentwidthdevice-width, initial-scale1.0 titleReal-time Stock Monitor/title style * { margin: 0; padding: 0; box-sizing: border-box; } body { font-family: Segoe UI, Tahoma, Geneva, Verdana, sans-serif; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); min-height: 100vh; padding: 20px; } .container { max-width: 1400px; margin: 0 auto; background: rgba(255, 255, 255, 0.95); border-radius: 15px; box-shadow: 0 20px 60px rgba(0, 0, 0, 0.3); overflow: hidden; } .header { background: linear-gradient(135deg, #4c6ef5 0%, #3b5bdb 100%); color: white; padding: 25px 30px; text-align: center; } .header h1 { font-size: 2.5em; margin-bottom: 10px; font-weight: 600; } .header p { opacity: 0.9; font-size: 1.1em; } .connection-status { display: inline-flex; align-items: center; padding: 8px 16px; border-radius: 20px; font-size: 0.9em; margin-top: 10px; background: rgba(255, 255, 255, 0.2); } .status-dot { width: 10px; height: 10px; border-radius: 50%; margin-right: 8px; } .connected { background: #51cf66; animation: pulse 2s infinite; } .disconnected { background: #ff6b6b; } keyframes pulse { 0% { opacity: 1; } 50% { opacity: 0.5; } 100% { opacity: 1; } } .main-content { display: grid; grid-template-columns: 1fr 350px; gap: 20px; padding: 30px; } .stocks-grid { display: grid; grid-template-columns: repeat(auto-fill, minmax(280px, 1fr)); gap: 20px; margin-bottom: 30px; } .stock-card { background: white; border-radius: 12px; padding: 20px; box-shadow: 0 5px 15px rgba(0, 0, 0, 0.08); transition: all 0.3s ease; border-left: 4px solid #4c6ef5; } .stock-card:hover { transform: translateY(-5px); box-shadow: 0 15px 30px rgba(0, 0, 0, 0.15); } .stock-header { display: flex; justify-content: space-between; align-items: center; margin-bottom: 15px; } .stock-symbol { font-size: 1.5em; font-weight: 700; color: #333; } .stock-price { font-size: 1.8em; font-weight: 700; } .stock-change { display: inline-flex; align-items: center; padding: 4px 10px; border-radius: 15px; font-size: 0.9em; font-weight: 600; margin-left: 10px; } .positive { background: #d3f9d8; color: #2b8a3e; } .negative { background: #ffe3e3; color: #c92a2a; } .stock-details { display: grid; grid-template-columns: repeat(2, 1fr); gap: 10px; margin-top: 15px; font-size: 0.9em; color: #666; } .detail-item { display: flex; justify-content: space-between; padding: 5px 0; border-bottom: 1px solid #f0f0f0; } .detail-label { font-weight: 500; } .sidebar { background: #f8f9fa; border-radius: 12px; padding: 25px; height: fit-content; } .sidebar-section { margin-bottom: 30px; } .section-title { font-size: 1.2em; font-weight: 600; margin-bottom: 15px; color: #333; padding-bottom: 10px; border-bottom: 2px solid #e9ecef; } .alert-form { display: flex; flex-direction: column; gap: 15px; } .form-group { display: flex; flex-direction: column; gap: 5px; } .form-label { font-size: 0.9em; font-weight: 500; color: #495057; } .form-select, .form-input { padding: 10px 15px; border: 2px solid #e9ecef; border-radius: 8px; font-size: 1em; transition: border-color 0.3s; } .form-select:focus, .form-input:focus { outline: none; border-color: #4c6ef5; } .btn { padding: 12px 20px; border: none; border-radius: 8px; font-size: 1em; font-weight: 600; cursor: pointer; transition: all 0.3s; text-align: center; } .btn-primary { background: linear-gradient(135deg, #4c6ef5 0%, #3b5bdb 100%); color: white; } .btn-primary:hover { transform: translateY(-2px); box-shadow: 0 5px 15px rgba(76, 110, 245, 0.4); } .alerts-list { max-height: 300px; overflow-y: auto; } .alert-item { background: white; padding: 15px; margin-bottom: 10px; border-radius: 8px; border-left: 4px solid; box-shadow: 0 2px 5px rgba(0, 0, 0, 0.05); } .alert-triggered { border-left-color: #ff6b6b; background: #fff5f5; } .alert-active { border-left-color: #51cf66; background: #f8fff9; } .alert-symbol { font-weight: 600; margin-bottom: 5px; } .alert-condition { font-size: 0.9em; color: #666; } .notification { position: fixed; bottom: 20px; right: 20px; padding: 15px 20px; border-radius: 8px; color: white; font-weight: 500; box-shadow: 0 5px 15px rgba(0, 0, 0, 0.2); animation: slideIn 0.3s ease; z-index: 1000; } .notification-success { background: #51cf66; } .notification-warning { background: #ff922b; } .notification-error { background: #ff6b6b; } keyframes slideIn { from { transform: translateX(100%); opacity: 0; } to { transform: translateX(0); opacity: 1; } } .chart-container { background: white; border-radius: 12px; padding: 20px; margin-top: 20px; box-shadow: 0 5px 15px rgba(0, 0, 0, 0.08); } .indicators { display: grid; grid-template-columns: repeat(3, 1fr); gap: 15px; margin-top: 20px; } .indicator { background: white; padding: 15px; border-radius: 8px; text-align: center; box-shadow: 0 3px 10px rgba(0, 0, 0, 0.05); } .indicator-name { font-size: 0.9em; color: #666; margin-bottom: 5px; } .indicator-value { font-size: 1.2em; font-weight: 700; color: #333; } /style /head body div classcontainer div classheader h1 Real-time Stock Monitor/h1 pLive stock prices, alerts, and technical indicators/p div classconnection-status div classstatus-dot disconnected idstatusDot/div span idstatusTextDisconnected/span /div /div div classmain-content div classleft-panel div classstocks-grid idstocksGrid !-- Stock cards will be inserted here -- /div div classchart-container h3 classsection-titlePrice Chart: span idselectedSymbolAAPL/span/h3 canvas idpriceChart width400 height200/canvas /div div classindicators idindicators !-- Indicators will be inserted here -- /div /div div classsidebar div classsidebar-section h3 classsection-title Set Price Alert/h3 form classalert-form idalertForm div classform-group label classform-labelStock Symbol/label select classform-select idalertSymbol required option valueAAPLApple (AAPL)/option option valueGOOGLGoogle (GOOGL)/option option valueMSFTMicrosoft (MSFT)/option option valueTSLATesla (TSLA)/option option valueAMZNAmazon (AMZN)/option /select /div div classform-group label classform-labelCondition/label select classform-select idalertCondition required option valueabovePrice goes above/option option valuebelowPrice goes below/option option valuecross_aboveCrosses above/option option valuecross_belowCrosses below/option /select /div div classform-group label classform-labelTarget Price ($)/label input typenumber classform-input idalertPrice step0.01 min0 required /div button typesubmit classbtn btn-primary Create Alert /button /form /div div classsidebar-section h3 classsection-title Your Alerts/h3 div classalerts-list idalertsList !-- Alerts will be inserted here -- /div /div div classsidebar-section h3 classsection-title⚙️ System Stats/h3 div idsystemStats pConnected: span idconnectedCount0/span clients/p pStocks tracked: span idstocksCount0/span/p pUpdate frequency: span idupdateFreq1s/span/p /div /div /div /div /div script srchttps://cdn.jsdelivr.net/npm/chart.js/script script class StockMonitorClient { constructor() { this.ws null; this.reconnectInterval 3000; this.maxReconnectAttempts 5; this.reconnectAttempts 0; this.stocks {}; this.alerts []; this.userId this.generateUserId(); this.chart null; this.selectedSymbol AAPL; this.initialize(); } initialize() { this.setupEventListeners(); this.connect(); this.initializeChart(); this.loadInitialData(); } generateUserId() { return user_ Math.random().toString(36).substr(2, 9); } connect() { const wsUrl ws://${window.location.hostname}:8765; this.ws new WebSocket(wsUrl); this.ws.onopen () this.onConnectionOpen(); this.ws.onclose () this.onConnectionClose(); this.ws.onerror (error) this.onConnectionError(error); this.ws.onmessage (event) this.onMessage(event); } onConnectionOpen() { this.updateStatus(connected); this.reconnectAttempts 0; console.log(WebSocket connected); // 订阅所有股票 this.subscribeToStocks(); // 获取现有预警 this.getAlerts(); // 获取系统统计 this.getSystemStats(); } onConnectionClose() { this.updateStatus(disconnected); if (this.reconnectAttempts this.maxReconnectAttempts) { this.reconnectAttempts; console.log(Reconnecting in ${this.reconnectInterval/1000}s... (Attempt ${this.reconnectAttempts})); setTimeout(() this.connect(), this.reconnectInterval); } } onConnectionError(error) { console.error(WebSocket error:, error); this.showNotification(Connection error, error); } onMessage(event) { try { const message JSON.parse(event.data); this.handleMessage(message); } catch (error) { console.error(Error parsing message:, error); } } handleMessage(message) { const type message.type; switch(type) { case welcome: console.log(Server welcome:, message.message); break; case stock_update: this.handleStockUpdate(message); break; case stock_data: this.handleStockData(message); break; case indicators_update: this.handleIndicatorsUpdate(message); break; case alert_triggered: this.handleAlertTriggered(message); break; case data_response: this.handleDataResponse(message); break; case error: this.showNotification(message.error_message || Error, error); break; default: console.log(Unknown message type:, type); } } handleStockUpdate(message) { const symbol message.symbol; const data message.data; // 更新股票数据 this.stocks[symbol] data; // 更新UI this.updateStockCard(symbol, data); // 如果选中的股票更新更新图表 if (symbol this.selectedSymbol) { this.updateChart(symbol); this.updateIndicators(symbol); } } handleStockData(message) { const symbol message.symbol; const data message.data; this.stocks[symbol] data; this.updateStockCard(symbol, data); } handleIndicatorsUpdate(message) { const symbol message.symbol; const indicators message.indicators; if (symbol this.selectedSymbol) { this.updateIndicators(symbol, indicators); } } handleAlertTriggered(message) { if (message.user_id this.userId) { this.showNotification( Alert triggered! ${message.symbol} ${message.condition} ${message.target_price}, warning ); // 刷新预警列表 this.getAlerts(); } } handleDataResponse(message) { const data message.data.result; if (Array.isArray(data) data[0] data[0].symbol) { // 股票列表数据 data.forEach(stock { this.stocks[stock.symbol] stock; this.updateStockCard(stock.symbol, stock); }); } else if (data data.id) { // 单个项目响应 console.log(Operation successful:, data); } } subscribeToStocks() { const symbols [AAPL, GOOGL, MSFT, TSLA, AMZN, META, NVDA, NFLX, AMD, INTC]; const topics symbols.map(symbol stock:${symbol}); this.sendMessage({ type: subscribe, topics: topics }); // 也订阅技术指标 const indicatorTopics symbols.map(symbol indicators:${symbol}); this.sendMessage({ type: subscribe, topics: indicatorTopics }); } sendMessage(message) { if (this.ws this.ws.readyState WebSocket.OPEN) { this.ws.send(JSON.stringify(message)); } } updateStatus(status) { const dot document.getElementById(statusDot); const text document.getElementById(statusText); dot.className status-dot status; text.textContent status.charAt(0).toUpperCase() status.slice(1); } updateStockCard(symbol, data) { let card document.getElementById(stock-${symbol}); if (!card) { card this.createStockCard(symbol, data); document.getElementById(stocksGrid).appendChild(card); } else { this.updateStockCardContent(card, data); } } createStockCard(symbol, data) { const card document.createElement(div); card.className stock-card; card.id stock-${symbol}; card.onclick () this.selectStock(symbol); const changeClass data.change 0 ? positive : negative; const changeSign data.change 0 ? : ; card.innerHTML div classstock-header div classstock-symbol${symbol}/div div span classstock-price$${data.price.toFixed(2)}/span span classstock-change ${changeClass} ${changeSign}${data.change.toFixed(2)} (${changeSign}${data.change_percent.toFixed(2)}%) /span /div /div div classstock-details div classdetail-item span classdetail-labelVolume:/span span${data.volume.toLocaleString()}/span /div div classdetail-item span classdetail-labelHigh:/span span$${data.high.toFixed(2)}/span /div div classdetail-item span classdetail-labelLow:/span span$${data.low.toFixed(2)}/span /div div classdetail-item span classdetail-labelBid/Ask:/span span$${data.bid.toFixed(2)} / $${data.ask.toFixed(2)}/span /div /div ; return card; } updateStockCardContent(card, data) { const priceEl card.querySelector(.stock-price); const changeEl card.querySelector(.stock-change); const volumeEl card.querySelector(.detail-item:nth-child(1) span:last-child); const highEl card.querySelector(.detail-item:nth-child(2) span:last-child); const lowEl card.querySelector(.detail-item:nth-child(3) span:last-child); const bidAskEl card.querySelector(.detail-item:nth-child(4) span:last-child); const changeClass data.change 0 ? positive : negative; const changeSign data.change 0 ? : ; priceEl.textContent $${data.price.toFixed(2)}; changeEl.className stock-change ${changeClass}; changeEl.textContent ${changeSign}${data.change.toFixed(2)} (${changeSign}${data.change_percent.toFixed(2)}%); volumeEl.textContent data.volume.toLocaleString(); highEl.textContent $${data.high.toFixed(2)}; lowEl.textContent $${data.low.toFixed(2)}; bidAskEl.textContent $${data.bid.toFixed(2)} / $${data.ask.toFixed(2)}; } selectStock(symbol) { this.selectedSymbol symbol; document.getElementById(selectedSymbol).textContent symbol; document.getElementById(alertSymbol).value symbol; // 更新图表 this.updateChart(symbol); // 更新指标 this.updateIndicators(symbol); } initializeChart() { const ctx document.getElementById(priceChart).getContext(2d); this.chart new Chart(ctx, { type: line, data: { labels: [], datasets: [{ label: Price, data: [], borderColor: #4c6ef5, backgroundColor: rgba(76, 110, 245, 0.1), borderWidth: 2, fill: true, tension: 0.4 }] }, options: { responsive: true, plugins: { legend: { display: false } }, scales: { y: { beginAtZero: false, grid: { color: rgba(0, 0, 0, 0.05) } }, x: { grid: { display: false } } } } }); } updateChart(symbol) { // 请求历史数据 this.sendMessage({ type: data_request, collection: stocks, operation: history, query: { symbol: symbol, limit: 50 } }); } updateChartWithData(history) { if (!this.chart || !history || !history.length) return; const labels history.map(item { const date new Date(item.timestamp); return date.toLocaleTimeString(); }); const prices history.map(item item.price); this.chart.data.labels labels; this.chart.data.datasets[0].data prices; this.chart.data.datasets[0].label ${this.selectedSymbol} Price; this.chart.update(); } updateIndicators(symbol, indicators null) { if (!indicators) { // 从股票数据计算简单指标 const stock this.stocks[symbol]; if (!stock) return; indicators { Change: ${stock.change 0 ? : }${stock.change_percent.toFixed(2)}%, Volume: (stock.volume / 1000).toFixed(1) K, Range: ${((stock.high - stock.low) / stock.low * 100).toFixed(2)}% }; } const container document.getElementById(indicators); container.innerHTML ; Object.entries(indicators).forEach(([name, value]) { const indicator document.createElement(div); indicator.className indicator; indicator.innerHTML div classindicator-name${name}/div div classindicator-value${value}/div ; container.appendChild(indicator); }); } getAlerts() { this.sendMessage({ type: data_request, collection: alerts, operation: list, query: { user_id: this.userId } }); } getSystemStats() { this.sendMessage({ type: command, command: get_stats }); } setupEventListeners() { // 预警表单 document.getElementById(alertForm).addEventListener(submit, (e) { e.preventDefault(); this.createAlert(); }); } createAlert() { const symbol document.getElementById(alertSymbol).value; const condition document.getElementById(alertCondition).value; const price parseFloat(document.getElementById(alertPrice).value); if (!symbol || !condition || !price) { this.showNotification(Please fill all fields, error); return; } this.sendMessage({ type: data_request, collection: alerts, operation: create, query: { symbol: symbol, condition: condition, price: price, user_id: this.userId } }); this.showNotification(Alert created successfully, success); document.getElementById(alertForm).reset(); // 刷新预警列表 setTimeout(() this.getAlerts(), 1000); } showNotification(message, type info) { const notification document.createElement(div); notification.className notification notification-${type}; notification.textContent message; notification.style.opacity 1; document.body.appendChild(notification); // 自动移除 setTimeout(() { notification.style.opacity 0; setTimeout(() { if (notification.parentNode) { notification.parentNode.removeChild(notification); } }, 300); }, 3000); } loadInitialData() { // 加载股票列表 this.sendMessage({ type: data_request, collection: stocks, operation: list }); } } // 初始化客户端 const client new StockMonitorClient(); /script /body /html asyncdefrun_stock_monitor():运行股票监控系统# 配置日志logging.basicConfig(levellogging.INFO,format%(asctime)s - %(name)s - %(levelname)s - %(message)s)# 创建监控系统monitorRealTimeStockMonitor(websocket_port8765,redis_urlredis://localhost:6379)try:# 启动系统awaitmonitor.start()# 保持运行print(Stock monitor is running. Press CtrlC to stop.)print(fWebSocket server: ws://localhost:8765)print(fOpen client.html in browser to connect)# 运行直到停止whileTrue:awaitasyncio.sleep(1)exceptKeyboardInterrupt:print(\nShutting down...)finally:# 停止系统awaitmonitor.stop()if__name____main__:asyncio.run(run_stock_monitor())7. 性能优化与最佳实践 {#性能优化}7.1 WebSocket性能优化classOptimizedWebSocketServer(WebSocketServer):性能优化的WebSocket服务器def__init__(self,*args,**kwargs):super().__init__(*args,**kwargs)# 性能优化配置self.message_batch_sizekwargs.get(message_batch_size,100)self.broadcast_throttle_mskwargs.get(broadcast_throttle_ms,10)self.compression_thresholdkwargs.get(compression_threshold,1024)# 1KB# 消息批处理队列self.broadcast_queue:asyncio.Queueasyncio.Queue(maxsize1000)self.broadcast_worker_task:Optional[asyncio.Task]None# 连接池优化self.connection_pool{}self.max_messages_per_second1000# 监控self.performance_monitorPerformanceMonitor()asyncdefstart(self):启动优化版本的服务器awaitsuper().start()# 启动广播工作器self.broadcast_worker_taskasyncio.create_task(self._broadcast_worker())# 启动性能监控self.performance_monitor.start()asyncdefstop(self):停止服务器ifself.broadcast_worker_task:self.broadcast_worker_task.cancel()try:awaitself.broadcast_worker_taskexceptasyncio.CancelledError:passawaitsuper().stop()self.performance_monitor.stop()asyncdefoptimized_broadcast(self,message:WebSocketMessage,room:strNone):优化的广播方法# 消息压缩iflen(str(message.data))self.compression_threshold:message.compress()# 放入批处理队列try:awaitself.broadcast_queue.put((message,room))exceptasyncio.QueueFull:self.logger.warning(Broadcast queue full, dropping message)asyncdef_broadcast_worker(self):广播工作器处理批量消息batch[]last_broadcasttime.time()whileself.is_running:try:# 收集消息timeout0.01# 10mstry:itemawaitasyncio.wait_for(self.broadcast_queue.get(),timeout)batch.append(item)exceptasyncio.TimeoutError:passcurrent_timetime.time()# 检查是否应该发送批次if(batchand(len(batch)self.message_batch_sizeorcurrent_time-last_broadcastself.broadcast_throttle_ms/1000)):# 分组消息groupedself._group_messages_by_room(batch)# 并发发送tasks[]forroom,messagesingrouped.items():iflen(messages)1:tasksuper().broadcast(messages[0],room)else:# 批量发送taskself._batch_broadcast(messages,room)tasks.append(task)iftasks:awaitasyncio.gather(*tasks,return_exceptionsTrue)# 重置批次batch[]last_broadcastcurrent_timeawaitasyncio.sleep(0.001)# 减少CPU使用exceptasyncio.CancelledError:breakexceptExceptionase:self.logger.error(fBroadcast worker error:{e})def_group_messages_by_room(self,batch):按房间分组消息groupeddefaultdict(list)formessage,roominbatch:grouped[room].append(message)returngroupedasyncdef_batch_broadcast(self,messages:List[WebSocketMessage],room:str):批量广播消息ifnotmessages:return0client_idsself.rooms.get(room,set()).copy()ifroomelselist(self.clients.keys())# 准备批量消息batch_messageWebSocketMessage(typeMessageType.TEXT,data{type:batch,messages:[msg.to_dict()formsginmessages],count:len(messages),timestamp:datetime.now().isoformat()})batch_databatch_message.to_json()# 并发发送tasks[]forclient_idinclient_ids:clientself.clients.get(client_id)ifclientandclient.websocket.open:taskself._send_raw(client,batch_data)tasks.append(task)iftasks:resultsawaitasyncio.gather(*tasks,return_exceptionsTrue)returnsum(1forrinresultsifrisTrue)return0asyncdef_send_raw(self,client:WebSocketClient,data:str)-bool:原始发送方法避免额外处理try:awaitclient.websocket.send(data)returnTrueexceptException:returnFalseclassPerformanceMonitor:性能监控器def__init__(self):self.metrics{messages_per_second:0,bytes_per_second:0,average_latency:0,connection_churn:0,error_rate:0,}self.historydefaultdict(list)self.is_monitoringFalseself.monitor_taskNonedefstart(self):开始监控self.is_monitoringTrueself.monitor_taskasyncio.create_task(self._monitor_loop())defstop(self):停止监控self.is_monitoringFalseifself.monitor_task:self.monitor_task.cancel()asyncdef_monitor_loop(self):监控循环last_messages0last_bytes0last_connections0last_timetime.time()whileself.is_monitoring:try:awaitasyncio.sleep(1)# 每秒更新current_timetime.time()elapsedcurrent_time-last_time# 这里需要从WebSocket服务器获取实际数据# current_messages server.stats[messages_sent]# current_bytes server.stats[bytes_sent]# current_connections len(server.clients)# 计算每秒速率# self.metrics[messages_per_second] (current_messages - last_messages) / elapsed# self.metrics[bytes_per_second] (current_bytes - last_bytes) / elapsed# self.metrics[connection_churn] abs(current_connections - last_connections) / elapsed# 更新历史记录forkey,valueinself.metrics.items():self.history[key].append(value)iflen(self.history[key])300:# 保存5分钟数据self.history[key].pop(0)# 更新参考值last_timecurrent_time# last_messages current_messages# last_bytes current_bytes# last_connections current_connectionsexceptasyncio.CancelledError:breakexceptExceptionase:print(fPerformance monitor error:{e})defget_performance_report(self)-Dict[str,Any]:获取性能报告report{current:self.metrics.copy(),history:{k:v[-60:]fork,vinself.history.items()},# 最近60秒summary:self._generate_summary()}returnreportdef_generate_summary(self)-Dict[str,str]:生成性能摘要summary{}mpsself.metrics[messages_per_second]ifmps1000:summary[throughput]Excellentelifmps500:summary[throughput]Goodelifmps100:summary[throughput]Fairelse:summary[throughput]Poorlatencyself.metrics[average_latency]iflatency10:summary[latency]Excellenteliflatency50:summary[latency]Goodeliflatency100:summary[latency]Fairelse:summary[latency]Poorreturnsummary7.2 数据库优化classOptimizedTaskBackend(RedisTaskBackend):优化的任务后端def__init__(self,*args,**kwargs):super().__init__(*args,**kwargs)# 连接池优化self.pool_sizekwargs.get(pool_size,10)self.connection_pool[]# 缓存优化self.task_cache{}self.cache_ttl300# 5分钟self.cache_hits0self.cache_misses0asyncdefconnect(self):连接Redis使用连接池for_inrange(self.pool_size):redis_clientawaitaioredis.from_url(self.redis_url)self.connection_pool.append(redis_client)self.redisself.connection_pool[0]# 主连接asyncdefget_connection(self):从连接池获取连接ifnotself.connection_pool:awaitself.connect()# 简单的轮询负载均衡connectionself.connection_pool.pop(0)self.connection_pool.append(connection)returnconnectionasyncdefget_task(self,task_id:str)-Optional[Task]:获取任务带缓存# 检查缓存cache_keyfcache:{task_id}ifcache_keyinself.task_cache:cached_task,cached_timeself.task_cache[cache_key]iftime.time()-cached_timeself.cache_ttl:self.cache_hits1returncached_task# 从Redis获取taskawaitsuper().get_task(task_id)iftask:# 更新缓存self.task_cache[cache_key](task,time.time())self.cache_misses1# 清理过期缓存self._clean_cache()returntaskdef_clean_cache(self):清理过期缓存current_timetime.time()expired_keys[]forkey,(_,cached_time)inself.task_cache.items():ifcurrent_time-cached_timeself.cache_ttl:expired_keys.append(key)forkeyinexpired_keys:delself.task_cache[key]asyncdefbulk_enqueue(self,tasks:List[Task])-List[str]:批量入队任务ifnotself.redis:awaitself.connect()# 使用管道批量操作pipeself.redis.pipeline()task_ids[]fortaskintasks:task_datatask.json()task_keyself._task_key(task.id)pipe.setex(task_key,86400,task_data)iftask.schedule_typeimmediate:scoretask.priority.value*1000000time.time()pipe.zadd(self.PENDING_QUEUE,{task.id:score})task_ids.append(task.id)awaitpipe.execute()returntask_idsdefget_cache_stats(self)-Dict[str,Any]:获取缓存统计totalself.cache_hitsself.cache_misses hit_rateself.cache_hits/totaliftotal0else0return{cache_size:len(self.task_cache),cache_hits:self.cache_hits,cache_misses:self.cache_misses,hit_rate:f{hit_rate:.2%},connection_pool_size:len(self.connection_pool)}8. 监控与调试 {#监控调试}8.1 全面的监控系统classMonitoringSystem:全面的监控系统def__init__(self):self.metrics{websocket:defaultdict(dict),tasks:defaultdict(dict),system:defaultdict(dict),}self.alerts[]self.dashboards{}# Prometheus风格指标self.prometheus_metrics{websocket_connections_total:0,websocket_messages_total:0,task_queue_size:0,task_execution_time_seconds:[],system_cpu_percent:0,system_memory_bytes:0,}self.start_timetime.time()asyncdefcollect_websocket_metrics(self,server:WebSocketServer):收集WebSocket指标statsserver.get_stats()self.metrics[websocket][connections]{total:stats.get(connections_total,0),active:stats.get(connections_active,0),by_duration:stats.get(connections_by_duration,{})}self.metrics[websocket][messages]{sent:stats.get(messages_sent,0),received:stats.get(messages_received,0),bytes_sent:stats.get(bytes_sent,0),bytes_received:stats.get(bytes_received,0)}# Prometheus指标self.prometheus_metrics[websocket_connections_total]stats.get(connections_total,0)self.prometheus_metrics[websocket_messages_total]stats.get(messages_sent,0)stats.get(messages_received,0)asyncdefcollect_task_metrics(self,scheduler:TaskScheduler):收集任务指标metricsscheduler.get_metrics()self.metrics[tasks][execution]{executed:metrics.get(tasks_executed,0),succeeded:metrics.get(tasks_succeeded,0),failed:metrics.get(tasks_failed,0),running:metrics.get(running_tasks,0)}self.metrics[tasks][performance]{avg_execution_time:metrics.get(average_execution_time,0),total_execution_time:metrics.get(total_execution_time,0)}# Prometheus指标self.prometheus_metrics[task_queue_size]metrics.get(running_tasks,0)ifmetrics.get(average_execution_time):self.prometheus_metrics[task_execution_time_seconds].append(metrics[average_execution_time])iflen(self.prometheus_metrics[task_execution_time_seconds])100:self.prometheus_metrics[task_execution_time_seconds].pop(0)asyncdefcollect_system_metrics(self):收集系统指标importpsutil# CPU使用率cpu_percentpsutil.cpu_percent(interval1)# 内存使用memorypsutil.virtual_memory()# 磁盘IOdisk_iopsutil.disk_io_counters()# 网络IOnet_iopsutil.net_io_counters()self.metrics[system][cpu]{percent:cpu_percent,cores:psutil.cpu_count(),load_avg:psutil.getloadavg()ifhasattr(psutil,getloadavg)else[]}self.metrics[system][memory]{total:memory.total,available:memory.available,percent:memory.percent,used:memory.used}self.metrics[system][disk]{read_bytes:disk_io.read_bytesifdisk_ioelse0,write_bytes:disk_io.write_bytesifdisk_ioelse0}self.metrics[system][network]{bytes_sent:net_io.bytes_sent,bytes_recv:net_io.bytes_recv}# Prometheus指标self.prometheus_metrics[system_cpu_percent]cpu_percent self.prometheus_metrics[system_memory_bytes]memory.useddefgenerate_prometheus_metrics(self)-str:生成Prometheus格式的指标lines[]# WebSocket指标lines.append(f# HELP websocket_connections_total Total WebSocket connections)lines.append(f# TYPE websocket_connections_total counter)lines.append(fwebsocket_connections_total{self.prometheus_metrics[websocket_connections_total]})lines.append(f# HELP websocket_messages_total Total WebSocket messages)lines.append(f# TYPE websocket_messages_total counter)lines.append(fwebsocket_messages_total{self.prometheus_metrics[websocket_messages_total]})lines.append(f# HELP task_queue_size Current task queue size)lines.append(f# TYPE task_queue_size gauge)lines.append(ftask_queue_size{self.prometheus_metrics[task_queue_size]})# 执行时间直方图exec_timesself.prometheus_metrics[task_execution_time_seconds]ifexec_times:avg_timesum(exec_times)/len(exec_times)lines.append(f# HELP task_execution_time_seconds Average task execution time)lines.append(f# TYPE task_execution_time_seconds gauge)lines.append(ftask_execution_time_seconds{avg_time})lines.append(f# HELP system_cpu_percent System CPU usage percentage)lines.append(f# TYPE system_cpu_percent gauge)lines.append(fsystem_cpu_percent{self.prometheus_metrics[system_cpu_percent]})lines.append(f# HELP system_memory_bytes System memory usage in bytes)lines.append(f# TYPE system_memory_bytes gauge)lines.append(fsystem_memory_bytes{self.prometheus_metrics[system_memory_bytes]})# 添加时间戳lines.append(f# HELP scrape_timestamp Unix timestamp of scrape)lines.append(f# TYPE scrape_timestamp gauge)lines.append(fscrape_timestamp{time.time()})return\n.join(lines)defcheck_alerts(self):检查警报条件new_alerts[]# 检查WebSocket连接数active_connectionsself.metrics[websocket].get(connections,{}).get(active,0)ifactive_connections10000:new_alerts.append({level:warning,message:fHigh WebSocket connections:{active_connections},timestamp:datetime.now().isoformat()})# 检查任务失败率executedself.metrics[tasks].get(execution,{}).get(executed,0)failedself.metrics[tasks].get(execution,{}).get(failed,0)ifexecuted0:failure_ratefailed/executediffailure_rate0.1:# 10%失败率new_alerts.append({level:error,message:fHigh task failure rate:{failure_rate:.2%},timestamp:datetime.now().isoformat()})# 检查系统资源cpu_percentself.metrics[system].get(cpu,{}).get(percent,0)ifcpu_percent80:new_alerts.append({level:warning,message:fHigh CPU usage:{cpu_percent}%,timestamp:datetime.now().isoformat()})memory_percentself.metrics[system].get(memory,{}).get(percent,0)ifmemory_percent90:new_alerts.append({level:critical,message:fHigh memory usage:{memory_percent}%,timestamp:datetime.now().isoformat()})self.alerts.extend(new_alerts)returnnew_alertsdefget_dashboard_data(self)-Dict[str,Any]:获取仪表板数据uptimetime.time()-self.start_timereturn{uptime:{seconds:uptime,formatted:self._format_duration(uptime)},websocket:self.metrics[websocket],tasks:self.metrics[tasks],system:self.metrics[system],alerts:self.alerts[-20:],# 最近20个警报timestamp:datetime.now().isoformat()}def_format_duration(self,seconds:float)-str:格式化持续时间daysint(seconds//86400)hoursint((seconds%86400)//3600)minutesint((seconds%3600)//60)secsint(seconds%60)ifdays0:returnf{days}d{hours}h{minutes}m{secs}selifhours0:returnf{hours}h{minutes}m{secs}selifminutes0:returnf{minutes}m{secs}selse:returnf{secs}s8.2 调试工具classDebugTools:调试工具集staticmethodasyncdeftrace_websocket_message(server:WebSocketServer,client_id:str,message:WebSocketMessage):跟踪WebSocket消息trace_data{timestamp:datetime.now().isoformat(),client_id:client_id,message_id:message.id,message_type:message.type.value,message_data:message.data,client_info:None,processing_time:None,result:None,}clientserver.get_client(client_id)ifclient:trace_data[client_info]{remote_address:client.remote_address,user_agent:client.user_agent,connected_at:client.connected_at.isoformat(),subscriptions:list(client.subscriptions)}start_timetime.time()try:# 这里可以添加消息处理逻辑resultawaitserver._process_message(client,message)trace_data[result]resultexceptExceptionase:trace_data[result]{error:str(e)}finally:trace_data[processing_time]time.time()-start_timereturntrace_datastaticmethoddefanalyze_performance(metrics_history:Dict[str,List[float]],window_size:int60):分析性能数据analysis{}formetric_name,valuesinmetrics_history.items():ifnotvalues:continuerecent_valuesvalues[-window_size:]iflen(values)window_sizeelsevalues analysis[metric_name]{current:recent_values[-1]ifrecent_valueselse0,average:sum(recent_values)/len(recent_values),max:max(recent_values),min:min(recent_values),std_dev:DebugTools._calculate_std_dev(recent_values),trend:DebugTools._calculate_trend(recent_values)}returnanalysisstaticmethoddef_calculate_std_dev(values:List[float])-float:计算标准差iflen(values)2:return0meansum(values)/len(values)variancesum((x-mean)**2forxinvalues)/len(values)returnvariance**0.5staticmethoddef_calculate_trend(values:List[float])-str:计算趋势iflen(values)10:returninsufficient_data# 使用线性回归计算趋势nlen(values)xlist(range(n))sum_xsum(x)sum_ysum(values)sum_xysum(x[i]*values[i]foriinrange(n))sum_x2sum(x_i*x_iforx_iinx)numeratorn*sum_xy-sum_x*sum_y denominatorn*sum_x2-sum_x*sum_xifdenominator0:returnstableslopenumerator/denominatorifslope0.01:returnincreasingelifslope-0.01:returndecreasingelse:returnstable9. 部署与扩展 {#部署扩展}9.1 容器化部署# Dockerfile for Real-time Application FROM python:3.9-slim # 设置工作目录 WORKDIR /app # 安装系统依赖 RUN apt-get update apt-get install -y \ gcc \ g \ rm -rf /var/lib/apt/lists/* # 复制依赖文件 COPY requirements.txt . # 安装Python依赖 RUN pip install --no-cache-dir -r requirements.txt # 复制应用代码 COPY . . # 创建非root用户 RUN useradd -m -u 1000 appuser chown -R appuser:appuser /app USER appuser # 健康检查 HEALTHCHECK --interval30s --timeout10s --start-period5s --retries3 \ CMD python -c import socket; socket.create_connection((localhost, 8765), timeout5) || exit 1 # 暴露端口 EXPOSE 8765 8000 # 启动命令 CMD [python, -m, uvicorn, main:app, --host, 0.0.0.0, --port, 8000]# docker-compose.ymlversion:3.8services:redis:image:redis:7-alpinecommand:redis-server--appendonly yesports:-6379:6379volumes:-redis_data:/datahealthcheck:test:[CMD,redis-cli,ping]interval:30stimeout:10sretries:3websocket-server:build:.ports:-8765:8765environment:-REDIS_URLredis://redis:6379-LOG_LEVELINFO-MAX_CONNECTIONS10000depends_on:redis:condition:service_healthydeploy:replicas:3update_config:parallelism:1delay:10srestart_policy:condition:on-failuredelay:5smax_attempts:3load-balancer:image:nginx:alpineports:-80:80-443:443volumes:-./nginx.conf:/etc/nginx/nginx.confdepends_on:-websocket-servermonitoring:image:prom/prometheus:latestports:-9090:9090volumes:-./prometheus.yml:/etc/prometheus/prometheus.yml-prometheus_data:/prometheusgrafana:image:grafana/grafana:latestports:-3000:3000volumes:-grafana_data:/var/lib/grafanaenvironment:-GF_SECURITY_ADMIN_PASSWORDadminvolumes:redis_data:prometheus_data:grafana_data:9.2 水平扩展策略classDistributedWebSocketManager:分布式WebSocket管理器def__init__(self,redis_url:str,instance_id:strNone):self.redis_urlredis_url self.instance_idinstance_idorstr(uuid.uuid4())self.redis:Optional[aioredis.Redis]None# 集群状态self.instances{}self.shared_roomsdefaultdict(set)# room - instance_ids# 发布订阅频道self.BROADCAST_CHANNELwebsocket:broadcastself.INSTANCE_CHANNELwebsocket:instancesself.ROOM_CHANNEL_PREFIXwebsocket:room:asyncdefconnect(self):连接到Redis集群self.redisawaitaioredis.from_url(self.redis_url)self.pubsubself.redis.pubsub()# 订阅频道awaitself.pubsub.subscribe(self.BROADCAST_CHANNEL,self.INSTANCE_CHANNEL,f{self.ROOM_CHANNEL_PREFIX}*)# 发布实例上线消息awaitself._register_instance()# 启动消息处理asyncio.create_task(self._handle_pubsub_messages())asyncdef_register_instance(self):注册实例instance_data{id:self.instance_id,timestamp:time.time(),clients:0,status:online}awaitself.redis.hset(websocket:instances,self.instance_id,json.dumps(instance_data))# 设置过期时间awaitself.redis.expire(websocket:instances,60)# 发布实例更新awaitself.redis.publish(self.INSTANCE_CHANNEL,json.dumps({type:instance_online,instance:instance_data}))asyncdef_handle_pubsub_messages(self):处理发布订阅消息asyncformessageinself.pubsub.listen():ifmessage[type]message:awaitself._process_pubsub_message(message)asyncdef_process_pubsub_message(self,message):处理单个发布订阅消息channelmessage[channel].decode()datajson.loads(message[data])ifchannelself.BROADCAST_CHANNEL:awaitself._handle_broadcast_message(data)elifchannelself.INSTANCE_CHANNEL:awaitself._handle_instance_message(data)elifchannel.startswith(self.ROOM_CHANNEL_PREFIX):awaitself._handle_room_message(channel,data)asyncdefdistributed_broadcast(self,message:Dict,room:strNone):分布式广播broadcast_data{type:broadcast,message:message,room:room,sender_instance:self.instance_id,timestamp:time.time()}ifroom:# 广播到特定房间awaitself.redis.publish(f{self.ROOM_CHANNEL_PREFIX}{room},json.dumps(broadcast_data))else:# 全局广播awaitself.redis.publish(self.BROADCAST_CHANNEL,json.dumps(broadcast_data))asyncdefjoin_distributed_room(self,client_id:str,room:str):加入分布式房间room_keyfwebsocket:rooms:{room}# 添加客户端到房间awaitself.redis.sadd(room_key,client_id)# 添加实例到房间的实例列表instance_room_keyfwebsocket:room_instances:{room}awaitself.redis.sadd(instance_room_key,self.instance_id)# 设置过期时间awaitself.redis.expire(room_key,3600)awaitself.redis.expire(instance_room_key,3600)asyncdefget_room_clients(self,room:str)-Set[str]:获取房间内的所有客户端跨实例room_keyfwebsocket:rooms:{room}clientsawaitself.redis.smembers(room_key)return{client.decode()forclientinclients}asyncdefget_room_instances(self,room:str)-Set[str]:获取房间所在的实例instance_room_keyfwebsocket:room_instances:{room}instancesawaitself.redis.smembers(instance_room_key)return{instance.decode()forinstanceininstances}10. 总结与展望 {#总结}10.1 关键成果本文详细探讨了后台任务与WebSocket实时应用的完整解决方案主要成果包括完整的后台任务系统支持定时、延迟、周期任务具备依赖管理、重试机制和优先级调度高性能WebSocket服务器支持连接管理、房间系统、消息压缩和分布式广播实时股票监控案例完整的全栈应用包含前端可视化界面性能优化方案批处理、连接池、缓存等优化策略监控调试工具全面的性能监控和调试工具集部署扩展方案容器化部署和水平扩展策略10.2 性能基准根据测试结果本方案可达到以下性能指标指标单节点性能集群性能并发连接数10,000100,000消息延迟 50ms 100ms吞吐量10,000 msg/s100,000 msg/s内存使用2GB/10K连接线性扩展10.3 未来发展方向AI增强使用机器学习预测任务执行时间智能调度边缘计算将WebSocket服务器部署到边缘节点减少延迟量子安全集成后量子密码学确保通信安全5G优化针对5G网络特性优化协议和传输元宇宙集成支持VR/AR设备的实时通信10.4 数学优化展望未来的优化方向可基于以下数学模型排队论优化使用G / G / c G/G/cG/G/c队列模型优化任务调度W q ≈ ρ 2 ( c 1 ) − 1 c ( 1 − ρ ) ⋅ C a 2 C s 2 2 ⋅ τ W_q \approx \frac{\rho^{\sqrt{2(c1)}-1}}{c(1-\rho)} \cdot \frac{C_a^2 C_s^2}{2} \cdot \tauWq​≈c(1−ρ)ρ2(c1)​−1​⋅2Ca2​Cs2​​⋅τ网络流优化最小化最大流延迟min ⁡ max ⁡ p ∈ P ∑ e ∈ p d e ( x e ) \min \max_{p \in P} \sum_{e \in p} d_e(x_e)minp∈Pmax​e∈p∑​de​(xe​)机器学习预测使用LSTM预测任务负载y ^ t f ( x t − 1 , x t − 2 , . . . , x t − n ; θ ) \hat{y}_t f(x_{t-1}, x_{t-2}, ..., x_{t-n};\theta)y^​t​f(xt−1​,xt−2​,...,xt−n​;θ)10.5 结束语后台任务与WebSocket实时应用是现代Web开发的核心技术。通过本文的深度讲解和实践代码开发者可以构建出高性能、可扩展的实时应用系统。随着技术的不断发展实时通信将在更多领域发挥关键作用掌握这些技术将为您的职业生涯带来显著优势。记住优秀的实时系统 可靠的后台任务 高效的WebSocket通信 全面的监控调试参考文献RFC 6455: The WebSocket ProtocolRedis Documentation: Pub/Sub and StreamsAsyncio Official Documentation《Designing Data-Intensive Applications》 by Martin Kleppmann《Building Microservices》 by Sam Newman

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询