南充响应式网站建设军事国际形势最新消息
2025/12/28 19:37:07 网站建设 项目流程
南充响应式网站建设,军事国际形势最新消息,太原流量大的网站,在线培训方案设计目录JWT认证与OAuth2集成#xff1a;构建安全的现代API引言1. JWT认证基础1.1 JWT结构解析1.1.1 Header#xff08;头部#xff09;1.1.2 Payload#xff08;负载#xff09;1.1.3 Signature#xff08;签名#xff09;1.2 JWT工作流程2. 完整的JWT认证系统实现2.1 项目…目录JWT认证与OAuth2集成构建安全的现代API引言1. JWT认证基础1.1 JWT结构解析1.1.1 Header头部1.1.2 Payload负载1.1.3 Signature签名1.2 JWT工作流程2. 完整的JWT认证系统实现2.1 项目结构2.2 配置管理2.3 密码安全处理2.4 JWT处理器2.5 数据模型2.6 Pydantic模型2.7 认证依赖项3. OAuth2集成实现3.1 OAuth2工作流程3.2 OAuth2配置管理3.3 OAuth数据库服务4. API端点实现4.1 认证端点4.2 用户管理端点5. 高级安全特性5.1 双因素认证实现5.2 会话管理5.3 安全中间件6. 完整的用户服务实现7. 测试用例8. 部署配置8.1 环境变量配置8.2 Docker配置9. 性能优化和安全建议9.1 性能优化9.2 安全建议10. 总结『宝藏代码胶囊开张啦』—— 我的 CodeCapsule 来咯✨写代码不再头疼我的新站点 CodeCapsule 主打一个 “白菜价”“量身定制”无论是卡脖子的毕设/课设/文献复现需要灵光一现的算法改进还是想给项目加个“外挂”这里都有便宜又好用的代码方案等你发现低成本高适配助你轻松通关速来围观 CodeCapsule官网JWT认证与OAuth2集成构建安全的现代API引言在现代Web应用和API开发中身份认证和授权是确保系统安全的核心组件。JSON Web TokenJWT和OAuth2.0已经成为实现这些安全机制的标准协议。本文将深入探讨JWT的工作原理、OAuth2.0的授权流程以及如何在FastAPI中实现完整的认证授权系统。1. JWT认证基础1.1 JWT结构解析JWT由三部分组成格式为header.payload.signature1.1.1 Header头部{alg:HS256,// 签名算法typ:JWT// 令牌类型}1.1.2 Payload负载包含声明claims有三种类型Registered claims预定义的声明如iss签发者、exp过期时间Public claims公开的声明可以自定义Private claims私有声明用于在双方之间共享信息1.1.3 Signature签名使用Base64Url编码的头部、负载和密钥计算签名Signature HMAC256 ( base64UrlEncode ( h e a d e r ) . base64UrlEncode ( p a y l o a d ) , secret ) \text{Signature} \text{HMAC256}(\text{base64UrlEncode}(header) . \text{base64UrlEncode}(payload), \text{secret})SignatureHMAC256(base64UrlEncode(header).base64UrlEncode(payload),secret)1.2 JWT工作流程ClientAPI ServerDatabase1. 发送登录请求(username/password)2. 验证用户凭证3. 返回用户信息4. 生成JWT令牌5. 返回JWT令牌6. 后续请求携带JWT(Authorization: Bearer token)7. 验证JWT签名和过期时间8. 返回请求的资源ClientAPI ServerDatabase2. 完整的JWT认证系统实现2.1 项目结构auth_system/ ├── src/ │ ├── __init__.py │ ├── main.py # FastAPI应用入口 │ ├── config.py # 配置文件 │ ├── database.py # 数据库配置 │ ├── models.py # 数据模型 │ ├── schemas.py # Pydantic模型 │ ├── auth/ │ │ ├── __init__.py │ │ ├── dependencies.py # 认证依赖项 │ │ ├── jwt_handler.py # JWT处理器 │ │ ├── password.py # 密码处理 │ │ └── security.py # 安全配置 │ ├── api/ │ │ ├── __init__.py │ │ ├── v1/ │ │ │ ├── __init__.py │ │ │ ├── endpoints/ │ │ │ │ ├── auth.py │ │ │ │ ├── users.py │ │ │ │ └── admin.py │ │ │ └── routers.py │ ├── core/ │ │ ├── __init__.py │ │ ├── exceptions.py # 自定义异常 │ │ └── security.py # 安全工具 │ └── utils/ │ ├── __init__.py │ └── email.py # 邮件工具 ├── tests/ ├── alembic/ # 数据库迁移 ├── .env └── requirements.txt2.2 配置管理# src/config.pyfrompydanticimportBaseSettings,FieldfromtypingimportOptional,ListimportsecretsclassSettings(BaseSettings):应用配置# 应用信息APP_NAME:strFastAPI Authentication SystemAPP_VERSION:str1.0.0API_V1_STR:str/api/v1# 安全配置SECRET_KEY:strField(default_factorylambda:secrets.token_urlsafe(32))ALGORITHM:strHS256ACCESS_TOKEN_EXPIRE_MINUTES:int30REFRESH_TOKEN_EXPIRE_DAYS:int7# CORS配置BACKEND_CORS_ORIGINS:List[str][http://localhost:3000,https://localhost:3000,]# 数据库配置DATABASE_URL:strpostgresqlasyncpg://user:passwordlocalhost/dbnameDATABASE_POOL_SIZE:int20DATABASE_MAX_OVERFLOW:int10# Redis配置用于令牌黑名单/刷新令牌REDIS_URL:strredis://localhost:6379/0# 邮件配置SMTP_HOST:Optional[str]NoneSMTP_PORT:Optional[int]587SMTP_USER:Optional[str]NoneSMTP_PASSWORD:Optional[str]NoneEMAILS_FROM_EMAIL:Optional[str]None# 安全增强PASSWORD_RESET_TOKEN_EXPIRE_HOURS:int24ACCOUNT_VERIFICATION_TOKEN_EXPIRE_HOURS:int24MAX_LOGIN_ATTEMPTS:int5LOCKOUT_TIME_MINUTES:int15# 速率限制RATE_LIMIT_PER_MINUTE:int60RATE_LIMIT_PER_HOUR:int1000# 环境ENVIRONMENT:strdevelopmentDEBUG:boolFalseclassConfig:env_file.envcase_sensitiveTruesettingsSettings()2.3 密码安全处理# src/auth/password.pyfrompasslib.contextimportCryptContextfromdatetimeimportdatetime,timedeltaimportsecretsimporthashlibfromtypingimportTuple,Optional# 密码哈希上下文pwd_contextCryptContext(schemes[argon2,bcrypt],# 优先使用argon2deprecatedauto,argon2__time_cost3,# 时间成本argon2__memory_cost65536,# 内存成本64MBargon2__parallelism4,# 并行度bcrypt__rounds12,# bcrypt轮数)classPasswordManager:密码管理器staticmethoddefverify_password(plain_password:str,hashed_password:str)-bool:验证密码returnpwd_context.verify(plain_password,hashed_password)staticmethoddefget_password_hash(password:str)-str:获取密码哈希值returnpwd_context.hash(password)staticmethoddefvalidate_password_strength(password:str)-Tuple[bool,str]: 验证密码强度 返回: (是否有效, 错误消息) iflen(password)8:returnFalse,密码必须至少8个字符iflen(password)128:returnFalse,密码不能超过128个字符# 检查字符类型has_upperany(c.isupper()forcinpassword)has_lowerany(c.islower()forcinpassword)has_digitany(c.isdigit()forcinpassword)has_specialany(notc.isalnum()forcinpassword)ifnot(has_upperandhas_lower):returnFalse,密码必须包含大小写字母ifnothas_digit:returnFalse,密码必须包含数字ifnothas_special:returnFalse,密码必须包含特殊字符# 检查常见弱密码weak_passwords{password,123456,qwerty,admin,welcome,password123,123456789,letmein,monkey}ifpassword.lower()inweak_passwords:returnFalse,密码太弱请使用更强的密码returnTrue,密码强度足够staticmethoddefgenerate_reset_token()-str:生成密码重置令牌returnsecrets.token_urlsafe(32)staticmethoddefcreate_reset_token_hash(token:str)-str:创建重置令牌的哈希值用于存储returnhashlib.sha256(token.encode()).hexdigest()staticmethoddefgenerate_totp_secret()-str:生成TOTP密钥用于双因素认证returnsecrets.token_hex(20)# 40个字符的十六进制字符串# 密码策略常量PASSWORD_POLICY{min_length:8,max_length:128,require_upper:True,require_lower:True,require_digit:True,require_special:True,prevent_common:True,}2.4 JWT处理器# src/auth/jwt_handler.pyfromdatetimeimportdatetime,timedeltafromtypingimportOptional,Dict,Any,Unionimportjwtfromjwt.exceptionsimportInvalidTokenErrorimportredis.asyncioasredisfromsrc.configimportsettingsfromsrc.core.exceptionsimportAuthExceptionimportuuidclassJWTManager:JWT令牌管理器def__init__(self):self.secret_keysettings.SECRET_KEY self.algorithmsettings.ALGORITHM self.access_token_expiretimedelta(minutessettings.ACCESS_TOKEN_EXPIRE_MINUTES)self.refresh_token_expiretimedelta(dayssettings.REFRESH_TOKEN_EXPIRE_DAYS)# Redis连接用于令牌黑名单和刷新令牌self.redis_clientredis.from_url(settings.REDIS_URL,encodingutf-8,decode_responsesFalse,)defcreate_access_token(self,subject:Union[str,Dict],token_type:straccess,additional_claims:Optional[Dict[str,Any]]None)-str: 创建访问令牌 Args: subject: 主题通常是用户ID token_type: 令牌类型access/refresh additional_claims: 额外声明 Returns: JWT令牌字符串 nowdatetime.utcnow()iftoken_typeaccess:expirenowself.access_token_expireelse:expirenowself.refresh_token_expire# 基本声明claims{sub:str(subject),iat:now,exp:expire,type:token_type,jti:str(uuid.uuid4()),# JWT ID用于撤销}# 添加额外声明ifadditional_claims:claims.update(additional_claims)# 生成令牌encoded_jwtjwt.encode(claims,self.secret_key,algorithmself.algorithm)returnencoded_jwtdefdecode_token(self,token:str)-Dict[str,Any]: 解码并验证JWT令牌 Args: token: JWT令牌字符串 Returns: 解码后的声明字典 Raises: AuthException: 如果令牌无效 try:payloadjwt.decode(token,self.secret_key,algorithms[self.algorithm])# 检查令牌类型ifpayload.get(type)!access:raiseAuthException(令牌类型不正确)returnpayloadexceptjwt.ExpiredSignatureError:raiseAuthException(令牌已过期)exceptjwt.InvalidTokenErrorase:raiseAuthException(f无效令牌:{str(e)})asyncdefrevoke_token(self,token:str)-None: 撤销令牌加入黑名单 Args: token: 要撤销的令牌 try:payloadself.decode_token(token)# 计算剩余过期时间exp_timestamppayload.get(exp)ifexp_timestamp:exp_datetimedatetime.fromtimestamp(exp_timestamp)ttl(exp_datetime-datetime.utcnow()).total_seconds()# 将令牌ID加入黑名单有效期至令牌过期jtipayload.get(jti)ifjtiandttl0:awaitself.redis_client.setex(fblacklist:{jti},int(ttl),revoked)exceptAuthException:pass# 无效令牌无需加入黑名单asyncdefis_token_revoked(self,token:str)-bool: 检查令牌是否已被撤销 Args: token: 要检查的令牌 Returns: 是否已被撤销 try:payloadself.decode_token(token)jtipayload.get(jti)ifnotjti:returnTrue# 检查是否在黑名单中resultawaitself.redis_client.exists(fblacklist:{jti})returnbool(result)exceptAuthException:returnTrueasyncdefcreate_token_pair(self,user_id:str)-Dict[str,str]: 创建访问令牌和刷新令牌对 Args: user_id: 用户ID Returns: 包含access_token和refresh_token的字典 # 创建访问令牌access_tokenself.create_access_token(subjectuser_id,token_typeaccess,additional_claims{scope:read write,})# 创建刷新令牌refresh_tokenself.create_access_token(subjectuser_id,token_typerefresh,additional_claims{scope:refresh,})# 存储刷新令牌用于刷新时验证refresh_token_payloadjwt.decode(refresh_token,self.secret_key,algorithms[self.algorithm])refresh_token_exprefresh_token_payload.get(exp)ifrefresh_token_exp:ttlrefresh_token_exp-int(datetime.utcnow().timestamp())ifttl0:awaitself.redis_client.setex(frefresh_token:{user_id},ttl,refresh_token_payload.get(jti,))return{access_token:access_token,refresh_token:refresh_token,token_type:bearer,expires_in:int(self.access_token_expire.total_seconds()),}asyncdefrefresh_access_token(self,refresh_token:str)-Dict[str,str]: 使用刷新令牌获取新的访问令牌 Args: refresh_token: 刷新令牌 Returns: 新的访问令牌 Raises: AuthException: 如果刷新令牌无效 try:# 解码刷新令牌payloadjwt.decode(refresh_token,self.secret_key,algorithms[self.algorithm])# 验证令牌类型ifpayload.get(type)!refresh:raiseAuthException(无效的刷新令牌)user_idpayload.get(sub)jtipayload.get(jti)ifnotuser_idornotjti:raiseAuthException(无效的刷新令牌)# 验证刷新令牌是否有效stored_jtiawaitself.redis_client.get(frefresh_token:{user_id})ifnotstored_jtiorstored_jti.decode()!jti:raiseAuthException(刷新令牌无效或已过期)# 创建新的访问令牌new_access_tokenself.create_access_token(subjectuser_id,token_typeaccess,additional_claims{scope:read write,})return{access_token:new_access_token,token_type:bearer,expires_in:int(self.access_token_expire.total_seconds()),}exceptjwt.ExpiredSignatureError:raiseAuthException(刷新令牌已过期)exceptjwt.InvalidTokenErrorase:raiseAuthException(f无效的刷新令牌:{str(e)})# 全局JWT管理器实例jwt_managerJWTManager()2.5 数据模型# src/models.pyfromsqlalchemyimportBoolean,Column,Integer,String,DateTime,Text,Enumfromsqlalchemy.dialects.postgresqlimportUUID,JSONBfromsqlalchemy.sqlimportfuncfromsqlalchemy.ext.declarativeimportdeclarative_basefromsqlalchemy.ormimportrelationshipimportenumimportuuid Basedeclarative_base()classUserRole(str,enum.Enum):用户角色枚举USERuserMODERATORmoderatorADMINadminSUPER_ADMINsuper_adminclassUserStatus(str,enum.Enum):用户状态枚举ACTIVEactiveINACTIVEinactiveSUSPENDEDsuspendedDELETEDdeletedclassUser(Base):用户模型__tablename__usersidColumn(UUID(as_uuidTrue),primary_keyTrue,defaultuuid.uuid4,indexTrue)emailColumn(String(255),uniqueTrue,indexTrue,nullableFalse)usernameColumn(String(50),uniqueTrue,indexTrue,nullableFalse)full_nameColumn(String(100))# 密码相关hashed_passwordColumn(String(255),nullableFalse)password_changed_atColumn(DateTime(timezoneTrue))# 角色和状态roleColumn(Enum(UserRole),defaultUserRole.USER,nullableFalse)statusColumn(Enum(UserStatus),defaultUserStatus.INACTIVE,nullableFalse)# 账户安全is_email_verifiedColumn(Boolean,defaultFalse)is_2fa_enabledColumn(Boolean,defaultFalse)two_factor_secretColumn(String(100))# 登录相关last_login_atColumn(DateTime(timezoneTrue))last_login_ipColumn(String(45))# 支持IPv6failed_login_attemptsColumn(Integer,default0)locked_untilColumn(DateTime(timezoneTrue))# 元数据created_atColumn(DateTime(timezoneTrue),server_defaultfunc.now(),nullableFalse)updated_atColumn(DateTime(timezoneTrue),server_defaultfunc.now(),onupdatefunc.now(),nullableFalse)# 关系sessionsrelationship(UserSession,back_populatesuser,cascadeall, delete-orphan)oauth_accountsrelationship(OAuthAccount,back_populatesuser,cascadeall, delete-orphan)def__repr__(self):returnfUser(id{self.id}, email{self.email}, username{self.username})classUserSession(Base):用户会话模型__tablename__user_sessionsidColumn(UUID(as_uuidTrue),primary_keyTrue,defaultuuid.uuid4,indexTrue)user_idColumn(UUID(as_uuidTrue),indexTrue,nullableFalse)session_tokenColumn(String(255),uniqueTrue,indexTrue,nullableFalse)# 设备信息user_agentColumn(Text)ip_addressColumn(String(45))device_typeColumn(String(50))device_nameColumn(String(100))browserColumn(String(50))platformColumn(String(50))# 会话状态is_activeColumn(Boolean,defaultTrue)last_activity_atColumn(DateTime(timezoneTrue),server_defaultfunc.now())# 过期时间expires_atColumn(DateTime(timezoneTrue),nullableFalse)# 时间戳created_atColumn(DateTime(timezoneTrue),server_defaultfunc.now())updated_atColumn(DateTime(timezoneTrue),server_defaultfunc.now(),onupdatefunc.now())# 关系userrelationship(User,back_populatessessions)classOAuthProvider(str,enum.Enum):OAuth提供者枚举GOOGLEgoogleGITHUBgithubFACEBOOKfacebookMICROSOFTmicrosoftAPPLEappleclassOAuthAccount(Base):OAuth账户模型__tablename__oauth_accountsidColumn(UUID(as_uuidTrue),primary_keyTrue,defaultuuid.uuid4,indexTrue)user_idColumn(UUID(as_uuidTrue),indexTrue,nullableFalse)# OAuth提供者信息providerColumn(Enum(OAuthProvider),nullableFalse)provider_user_idColumn(String(255),nullableFalse)provider_user_emailColumn(String(255),nullableFalse)# OAuth令牌信息access_tokenColumn(Text,nullableFalse)refresh_tokenColumn(Text)access_token_expires_atColumn(DateTime(timezoneTrue))# 提供者返回的额外数据provider_dataColumn(JSONB)# 时间戳created_atColumn(DateTime(timezoneTrue),server_defaultfunc.now())updated_atColumn(DateTime(timezoneTrue),server_defaultfunc.now(),onupdatefunc.now())# 唯一约束__table_args__(# 每个用户在同一个提供者下只能有一个账户(UniqueConstraint(user_id,provider,nameuq_user_provider)),# 同一个提供者的用户ID必须唯一(UniqueConstraint(provider,provider_user_id,nameuq_provider_user)),)# 关系userrelationship(User,back_populatesoauth_accounts)def__repr__(self):returnfOAuthAccount(provider{self.provider}, email{self.provider_user_email})classPasswordResetToken(Base):密码重置令牌模型__tablename__password_reset_tokensidColumn(UUID(as_uuidTrue),primary_keyTrue,defaultuuid.uuid4,indexTrue)emailColumn(String(255),nullableFalse,indexTrue)token_hashColumn(String(255),nullableFalse,uniqueTrue,indexTrue)expires_atColumn(DateTime(timezoneTrue),nullableFalse)is_usedColumn(Boolean,defaultFalse)used_atColumn(DateTime(timezoneTrue))created_atColumn(DateTime(timezoneTrue),server_defaultfunc.now())classEmailVerificationToken(Base):邮箱验证令牌模型__tablename__email_verification_tokensidColumn(UUID(as_uuidTrue),primary_keyTrue,defaultuuid.uuid4,indexTrue)user_idColumn(UUID(as_uuidTrue),nullableFalse,indexTrue)token_hashColumn(String(255),nullableFalse,uniqueTrue,indexTrue)expires_atColumn(DateTime(timezoneTrue),nullableFalse)is_usedColumn(Boolean,defaultFalse)used_atColumn(DateTime(timezoneTrue))created_atColumn(DateTime(timezoneTrue),server_defaultfunc.now())2.6 Pydantic模型# src/schemas.pyfrompydanticimportBaseModel,EmailStr,Field,validatorfromtypingimportOptional,ListfromdatetimeimportdatetimefromenumimportEnumimportreclassToken(BaseModel):令牌响应模型access_token:strrefresh_token:strtoken_type:strbearerexpires_in:intclassConfig:schema_extra{example:{access_token:eyJhbGciOiJIUzI1NiIs...,refresh_token:eyJhbGciOiJIUzI1NiIs...,token_type:bearer,expires_in:1800}}classTokenData(BaseModel):令牌数据模型sub:Optional[str]Noneexp:Optional[datetime]Nonetype:Optional[str]Nonescope:Optional[str]Nonejti:Optional[str]NoneclassUserBase(BaseModel):用户基础模型email:EmailStr username:strField(...,min_length3,max_length50,regex^[a-zA-Z0-9_]$)full_name:Optional[str]Field(None,max_length100)classUserCreate(UserBase):用户创建模型password:strvalidator(password)defvalidate_password(cls,v):fromsrc.auth.passwordimportPasswordManager is_valid,messagePasswordManager.validate_password_strength(v)ifnotis_valid:raiseValueError(message)returnvvalidator(username)defvalidate_username(cls,v):# 检查用户名是否包含敏感词sensitive_words[admin,root,system,moderator]ifv.lower()insensitive_words:raiseValueError(用户名包含敏感词汇)returnvclassUserLogin(BaseModel):用户登录模型email:Optional[EmailStr]Noneusername:Optional[str]Nonepassword:strvalidator(email,username)defcheck_identifier(cls,v,values,**kwargs):# 确保至少提供邮箱或用户名之一ifnotvandnotvalues.get(username)andnotvalues.get(email):raiseValueError(必须提供邮箱或用户名)returnvclassUserInDB(UserBase):数据库中的用户模型id:strrole:strstatus:stris_email_verified:boolis_2fa_enabled:boolcreated_at:datetime updated_at:datetimeclassConfig:orm_modeTrueclassUserResponse(BaseModel):用户响应模型id:stremail:EmailStr username:strfull_name:Optional[str]role:strstatus:stris_email_verified:boolis_2fa_enabled:boolcreated_at:datetimeclassConfig:orm_modeTrueclassPasswordChange(BaseModel):密码修改模型current_password:strnew_password:strvalidator(new_password)defvalidate_new_password(cls,v):fromsrc.auth.passwordimportPasswordManager is_valid,messagePasswordManager.validate_password_strength(v)ifnotis_valid:raiseValueError(message)returnvclassPasswordResetRequest(BaseModel):密码重置请求模型email:EmailStrclassPasswordResetConfirm(BaseModel):密码重置确认模型token:strnew_password:strvalidator(new_password)defvalidate_new_password(cls,v):fromsrc.auth.passwordimportPasswordManager is_valid,messagePasswordManager.validate_password_strength(v)ifnotis_valid:raiseValueError(message)returnvclassTwoFactorEnable(BaseModel):双因素认证启用模型code:strField(...,min_length6,max_length6,regex^[0-9]$)classTwoFactorVerify(BaseModel):双因素认证验证模型code:strField(...,min_length6,max_length6,regex^[0-9]$)classOAuthLoginRequest(BaseModel):OAuth登录请求模型provider:strredirect_uri:strclassOAuthCallback(BaseModel):OAuth回调模型code:strstate:strclassUserSessionResponse(BaseModel):用户会话响应模型id:struser_agent:Optional[str]ip_address:Optional[str]device_type:Optional[str]device_name:Optional[str]browser:Optional[str]platform:Optional[str]last_activity_at:datetime created_at:datetimeclassConfig:orm_modeTrue2.7 认证依赖项# src/auth/dependencies.pyfromfastapiimportDepends,HTTPException,statusfromfastapi.securityimportHTTPBearer,HTTPAuthorizationCredentialsfromtypingimportOptional,Listfromsqlalchemy.ext.asyncioimportAsyncSessionfromsqlalchemyimportselectfromsrc.databaseimportget_dbfromsrc.auth.jwt_handlerimportjwt_managerfromsrc.modelsimportUser,UserRole,UserStatusfromsrc.schemasimportTokenData securityHTTPBearer(scheme_nameJWT,descriptionJWT Bearer令牌,auto_errorFalse)classPermissionChecker:权限检查器def__init__(self,required_roles:List[UserRole]None):self.required_rolesrequired_rolesor[]asyncdef__call__(self,current_user:UserDepends(get_current_user)):检查用户是否有足够权限ifself.required_roles:user_roleUserRole(current_user.role)ifuser_rolenotinself.required_roles:raiseHTTPException(status_codestatus.HTTP_403_FORBIDDEN,detail权限不足)returncurrent_userasyncdefget_current_user(credentials:Optional[HTTPAuthorizationCredentials]Depends(security),db:AsyncSessionDepends(get_db))-User: 获取当前认证用户 Args: credentials: HTTP授权凭证 db: 数据库会话 Returns: 当前用户 Raises: HTTPException: 如果认证失败 ifnotcredentials:raiseHTTPException(status_codestatus.HTTP_401_UNAUTHORIZED,detail需要认证,headers{WWW-Authenticate:Bearer},)tokencredentials.credentialstry:# 验证令牌是否已被撤销ifawaitjwt_manager.is_token_revoked(token):raiseHTTPException(status_codestatus.HTTP_401_UNAUTHORIZED,detail令牌已失效,headers{WWW-Authenticate:Bearer},)# 解码令牌payloadjwt_manager.decode_token(token)token_dataTokenData(**payload)iftoken_data.subisNone:raiseHTTPException(status_codestatus.HTTP_401_UNAUTHORIZED,detail无效的认证凭证,headers{WWW-Authenticate:Bearer},)# 获取用户user_idtoken_data.sub resultawaitdb.execute(select(User).where(User.iduser_id))userresult.scalar_one_or_none()ifuserisNone:raiseHTTPException(status_codestatus.HTTP_401_UNAUTHORIZED,detail用户不存在,headers{WWW-Authenticate:Bearer},)# 检查用户状态ifuser.statusUserStatus.SUSPENDED:raiseHTTPException(status_codestatus.HTTP_403_FORBIDDEN,detail账户已被封禁)ifuser.statusUserStatus.DELETED:raiseHTTPException(status_codestatus.HTTP_401_UNAUTHORIZED,detail账户已删除)returnuserexceptHTTPException:raiseexceptExceptionase:raiseHTTPException(status_codestatus.HTTP_401_UNAUTHORIZED,detailf认证失败:{str(e)},headers{WWW-Authenticate:Bearer},)asyncdefget_current_active_user(current_user:UserDepends(get_current_user))-User:获取当前活跃用户ifcurrent_user.status!UserStatus.ACTIVE:raiseHTTPException(status_codestatus.HTTP_400_BAD_REQUEST,detail账户未激活)returncurrent_userasyncdefget_current_superuser(current_user:UserDepends(get_current_user))-User:获取当前超级管理员用户ifcurrent_user.role!UserRole.SUPER_ADMIN:raiseHTTPException(status_codestatus.HTTP_403_FORBIDDEN,detail需要超级管理员权限)returncurrent_user# 权限依赖项require_adminPermissionChecker(required_roles[UserRole.ADMIN,UserRole.SUPER_ADMIN])require_moderatorPermissionChecker(required_roles[UserRole.MODERATOR,UserRole.ADMIN,UserRole.SUPER_ADMIN])require_super_adminPermissionChecker(required_roles[UserRole.SUPER_ADMIN])# 可选认证依赖项asyncdefget_current_user_optional(credentials:Optional[HTTPAuthorizationCredentials]Depends(security),db:AsyncSessionDepends(get_db))-Optional[User]: 可选的当前用户获取如果没有认证返回None ifnotcredentials:returnNonetry:userawaitget_current_user(credentials,db)returnuserexceptHTTPException:returnNone3. OAuth2集成实现3.1 OAuth2工作流程1. 发起OAuth登录2. 重定向到授权页面3. 用户授权4. 返回授权码5. 用授权码交换令牌6. 返回访问令牌7. 获取用户信息8. 返回用户信息9. 创建/获取本地用户10. 生成JWT令牌客户端FastAPI应用OAuth提供者如Google/GitHub用户数据库3.2 OAuth2配置管理# src/auth/oauth.pyfromtypingimportDict,Optional,Anyfromauthlib.integrations.starlette_clientimportOAuthfromauthlib.oauth2.rfc6749.wrappersimportOAuth2Tokenfromstarlette.configimportConfigfromsrc.configimportsettingsfromsrc.modelsimportOAuthProviderimportsecretsclassOAuthConfig:OAuth配置管理器def__init__(self):self.oauthOAuth()# 加载配置config_dict{# Google配置GOOGLE_CLIENT_ID:getattr(settings,GOOGLE_CLIENT_ID,),GOOGLE_CLIENT_SECRET:getattr(settings,GOOGLE_CLIENT_SECRET,),# GitHub配置GITHUB_CLIENT_ID:getattr(settings,GITHUB_CLIENT_ID,),GITHUB_CLIENT_SECRET:getattr(settings,GITHUB_CLIENT_SECRET,),# Facebook配置FACEBOOK_CLIENT_ID:getattr(settings,FACEBOOK_CLIENT_ID,),FACEBOOK_CLIENT_SECRET:getattr(settings,FACEBOOK_CLIENT_SECRET,),# Microsoft配置MICROSOFT_CLIENT_ID:getattr(settings,MICROSOFT_CLIENT_ID,),MICROSOFT_CLIENT_SECRET:getattr(settings,MICROSOFT_CLIENT_SECRET,),# Apple配置APPLE_CLIENT_ID:getattr(settings,APPLE_CLIENT_ID,),APPLE_CLIENT_SECRET:getattr(settings,APPLE_CLIENT_SECRET,),APPLE_KEY_ID:getattr(settings,APPLE_KEY_ID,),APPLE_TEAM_ID:getattr(settings,APPLE_TEAM_ID,),APPLE_PRIVATE_KEY:getattr(settings,APPLE_PRIVATE_KEY,),}configConfig(environconfig_dict)# 注册Google OAuthifconfig_dict[GOOGLE_CLIENT_ID]:self.oauth.register(namegoogle,client_idconfig(GOOGLE_CLIENT_ID),client_secretconfig(GOOGLE_CLIENT_SECRET),server_metadata_urlhttps://accounts.google.com/.well-known/openid-configuration,client_kwargs{scope:openid email profile,prompt:select_account,})# 注册GitHub OAuthifconfig_dict[GITHUB_CLIENT_ID]:self.oauth.register(namegithub,client_idconfig(GITHUB_CLIENT_ID),client_secretconfig(GITHUB_CLIENT_SECRET),access_token_urlhttps://github.com/login/oauth/access_token,authorize_urlhttps://github.com/login/oauth/authorize,api_base_urlhttps://api.github.com/,client_kwargs{scope:user:email})# 注册Facebook OAuthifconfig_dict[FACEBOOK_CLIENT_ID]:self.oauth.register(namefacebook,client_idconfig(FACEBOOK_CLIENT_ID),client_secretconfig(FACEBOOK_CLIENT_SECRET),access_token_urlhttps://graph.facebook.com/oauth/access_token,authorize_urlhttps://www.facebook.com/dialog/oauth,api_base_urlhttps://graph.facebook.com/,client_kwargs{scope:email})# 注册Microsoft OAuthifconfig_dict[MICROSOFT_CLIENT_ID]:self.oauth.register(namemicrosoft,client_idconfig(MICROSOFT_CLIENT_ID),client_secretconfig(MICROSOFT_CLIENT_SECRET),server_metadata_urlhttps://login.microsoftonline.com/common/v2.0/.well-known/openid-configuration,client_kwargs{scope:openid email profile,})defget_client(self,provider:str):获取OAuth客户端returnself.oauth.create_client(provider)defget_provider_scopes(self,provider:str)-Dict[str,str]:获取提供者的scope配置scopes{google:openid email profile,github:user:email,facebook:email,microsoft:openid email profile,apple:email name,}returnscopes.get(provider,)classOAuthService:OAuth服务def__init__(self,oauth_config:OAuthConfig):self.oauth_configoauth_config self.state_store{}# 临时存储state生产环境应该使用Redisasyncdefcreate_authorization_url(self,provider:str,redirect_uri:str,state:Optional[str]None)-Dict[str,str]: 创建授权URL Args: provider: OAuth提供者 redirect_uri: 回调URL state: 状态参数防止CSRF攻击 Returns: 包含授权URL和state的字典 clientself.oauth_config.get_client(provider)ifnotclient:raiseValueError(f不支持的OAuth提供者:{provider})# 生成state如果未提供ifnotstate:statesecrets.token_urlsafe(16)# 存储state生产环境应该使用Redisself.state_store[state]{provider:provider,redirect_uri:redirect_uri,created_at:datetime.utcnow().isoformat()}# 创建授权URLauthorization_urlawaitclient.create_authorization_url(redirect_uriredirect_uri,statestate)return{authorization_url:authorization_url[url],state:state}asyncdefvalidate_state(self,state:str)-bool:验证state参数ifstatenotinself.state_store:returnFalsestate_dataself.state_store[state]created_atdatetime.fromisoformat(state_data[created_at])# State应该在10分钟内使用ifdatetime.utcnow()-created_attimedelta(minutes10):delself.state_store[state]returnFalsereturnTrueasyncdefget_access_token(self,provider:str,code:str,redirect_uri:str)-OAuth2Token: 使用授权码获取访问令牌 Args: provider: OAuth提供者 code: 授权码 redirect_uri: 回调URL Returns: OAuth2令牌 clientself.oauth_config.get_client(provider)ifnotclient:raiseValueError(f不支持的OAuth提供者:{provider})# 交换令牌tokenawaitclient.fetch_access_token(codecode,redirect_uriredirect_uri)returntokenasyncdefget_user_info(self,provider:str,token:OAuth2Token)-Dict[str,Any]: 获取用户信息 Args: provider: OAuth提供者 token: OAuth2令牌 Returns: 用户信息字典 clientself.oauth_config.get_client(provider)ifnotclient:raiseValueError(f不支持的OAuth提供者:{provider})# 设置令牌client.tokentoken# 根据提供者获取用户信息ifprovidergoogle:userinfo_endpointhttps://openidconnect.googleapis.com/v1/userinforespawaitclient.get(userinfo_endpoint)user_inforesp.json()return{id:user_info.get(sub),email:user_info.get(email),verified_email:user_info.get(email_verified,False),name:user_info.get(name),given_name:user_info.get(given_name),family_name:user_info.get(family_name),picture:user_info.get(picture),locale:user_info.get(locale),}elifprovidergithub:# GitHub需要额外请求获取邮箱respawaitclient.get(user)user_inforesp.json()# 获取主要邮箱emails_respawaitclient.get(user/emails)emailsemails_resp.json()primary_emailNoneverified_emailNoneforemailinemails:ifemail.get(primary):primary_emailemail.get(email)ifemail.get(verified)andnotprimary_email:verified_emailemail.get(email)emailprimary_emailorverified_emailoruser_info.get(email)return{id:str(user_info.get(id)),email:email,verified_email:bool(email),name:user_info.get(name),login:user_info.get(login),avatar_url:user_info.get(avatar_url),bio:user_info.get(bio),location:user_info.get(location),}elifproviderfacebook:# Facebook需要指定fieldsrespawaitclient.get(me,params{fields:id,name,email,picture})user_inforesp.json()return{id:user_info.get(id),email:user_info.get(email),verified_email:True,# Facebook邮箱默认已验证name:user_info.get(name),picture:user_info.get(picture,{}).get(data,{}).get(url),}elifprovidermicrosoft:respawaitclient.get(https://graph.microsoft.com/v1.0/me)user_inforesp.json()return{id:user_info.get(id),email:user_info.get(mail)oruser_info.get(userPrincipalName),verified_email:True,# Microsoft邮箱默认已验证name:user_info.get(displayName),given_name:user_info.get(givenName),family_name:user_info.get(surname),}else:raiseValueError(f不支持的提供者:{provider})asyncdefcleanup_state(self,state:str):清理stateifstateinself.state_store:delself.state_store[state]# 全局OAuth服务实例oauth_configOAuthConfig()oauth_serviceOAuthService(oauth_config)3.3 OAuth数据库服务# src/services/oauth_service.pyfromsqlalchemy.ext.asyncioimportAsyncSessionfromsqlalchemyimportselect,or_fromtypingimportOptional,Dict,Anyfromdatetimeimportdatetime,timedeltafromsrc.modelsimportUser,OAuthAccount,OAuthProvider,UserStatusfromsrc.auth.passwordimportPasswordManagerfromsrc.auth.jwt_handlerimportjwt_managerfromsrc.core.exceptionsimportAuthExceptionclassOAuthUserService:OAuth用户服务def__init__(self,db:AsyncSession):self.dbdbasyncdefget_or_create_user_from_oauth(self,provider:OAuthProvider,provider_user_id:str,provider_user_email:str,oauth_token:Dict[str,Any],user_info:Dict[str,Any])-User: 从OAuth信息获取或创建用户 Args: provider: OAuth提供者 provider_user_id: 提供者用户ID provider_user_email: 提供者用户邮箱 oauth_token: OAuth令牌信息 user_info: 用户信息 Returns: 用户对象 # 1. 查找现有的OAuth账户oauth_accountawaitself.find_oauth_account(provider,provider_user_id)ifoauth_account:# 更新OAuth令牌信息awaitself.update_oauth_account(oauth_account,oauth_token)returnoauth_account.user# 2. 查找现有用户通过邮箱userawaitself.find_user_by_email(provider_user_email)ifuser:# 为用户创建新的OAuth账户awaitself.create_oauth_account(useruser,providerprovider,provider_user_idprovider_user_id,provider_user_emailprovider_user_email,oauth_tokenoauth_token,user_infouser_info)returnuser# 3. 创建新用户userawaitself.create_user_from_oauth(providerprovider,provider_user_idprovider_user_id,provider_user_emailprovider_user_email,oauth_tokenoauth_token,user_infouser_info)returnuserasyncdeffind_oauth_account(self,provider:OAuthProvider,provider_user_id:str)-Optional[OAuthAccount]:查找OAuth账户resultawaitself.db.execute(select(OAuthAccount).where(OAuthAccount.providerprovider,OAuthAccount.provider_user_idprovider_user_id))returnresult.scalar_one_or_none()asyncdeffind_user_by_email(self,email:str)-Optional[User]:通过邮箱查找用户resultawaitself.db.execute(select(User).where(User.emailemail))returnresult.scalar_one_or_none()asyncdefupdate_oauth_account(self,oauth_account:OAuthAccount,oauth_token:Dict[str,Any]):更新OAuth账户信息oauth_account.access_tokenoauth_token.get(access_token)oauth_account.refresh_tokenoauth_token.get(refresh_token)# 更新访问令牌过期时间ifexpires_at:oauth_token.get(expires_at):oauth_account.access_token_expires_atexpires_at oauth_account.updated_atdatetime.utcnow()awaitself.db.commit()asyncdefcreate_oauth_account(self,user:User,provider:OAuthProvider,provider_user_id:str,provider_user_email:str,oauth_token:Dict[str,Any],user_info:Dict[str,Any]):创建OAuth账户oauth_accountOAuthAccount(user_iduser.id,providerprovider,provider_user_idprovider_user_id,provider_user_emailprovider_user_email,access_tokenoauth_token.get(access_token),refresh_tokenoauth_token.get(refresh_token),access_token_expires_atoauth_token.get(expires_at),provider_datauser_info,)self.db.add(oauth_account)awaitself.db.commit()asyncdefcreate_user_from_oauth(self,provider:OAuthProvider,provider_user_id:str,provider_user_email:str,oauth_token:Dict[str,Any],user_info:Dict[str,Any])-User:从OAuth信息创建用户# 生成用户名如果没有提供usernameself.generate_username(user_info)# 生成随机密码用户可以通过邮箱重置random_passwordPasswordManager.get_password_hash(secrets.token_urlsafe(32))# 创建用户userUser(emailprovider_user_email,usernameusername,full_nameuser_info.get(name),hashed_passwordrandom_password,is_email_verifieduser_info.get(verified_email,True),statusUserStatus.ACTIVE,)self.db.add(user)awaitself.db.commit()awaitself.db.refresh(user)# 创建OAuth账户awaitself.create_oauth_account(useruser,providerprovider,provider_user_idprovider_user_id,provider_user_emailprovider_user_email,oauth_tokenoauth_token,user_infouser_info)returnuserdefgenerate_username(self,user_info:Dict[str,Any])-str:生成用户名# 尝试从用户信息获取用户名usernameNoneiflogininuser_info:# GitHubusernameuser_info[login]elifgiven_nameinuser_info:# Google/Microsoftgiven_nameuser_info.get(given_name,).lower()family_nameuser_info.get(family_name,).lower()ifgiven_nameandfamily_name:usernamef{given_name}.{family_name}elifgiven_name:usernamegiven_name# 如果没有合适的用户名生成一个ifnotusername:emailuser_info.get(email,user)base_usernameemail.split()[0]usernamebase_username# 确保用户名合法usernamere.sub(r[^a-zA-Z0-9_],,username)# 添加随机后缀以防重复iflen(username)3:usernamestr(random.randint(100,999))else:usernamestr(random.randint(10,99))returnusername[:50]# 限制长度4. API端点实现4.1 认证端点# src/api/v1/endpoints/auth.pyfromfastapiimportAPIRouter,Depends,HTTPException,status,BackgroundTasksfromfastapi.securityimportOAuth2PasswordRequestFormfromsqlalchemy.ext.asyncioimportAsyncSessionfromtypingimportDict,Anyimportpyotpimportqrcodeimportioimportbase64fromsrc.databaseimportget_dbfromsrc.auth.dependenciesimportget_current_userfromsrc.auth.jwt_handlerimportjwt_managerfromsrc.auth.passwordimportPasswordManagerfromsrc.schemasimport(UserCreate,UserLogin,Token,PasswordChange,PasswordResetRequest,PasswordResetConfirm,TwoFactorEnable,TwoFactorVerify,OAuthLoginRequest,OAuthCallback,UserResponse)fromsrc.modelsimportUser,UserSession,UserStatusfromsrc.services.user_serviceimportUserServicefromsrc.services.oauth_serviceimportOAuthUserServicefromsrc.auth.oauthimportoauth_servicefromsrc.utils.emailimportsend_password_reset_email,send_verification_email routerAPIRouter()router.post(/register,response_modelUserResponse,status_codestatus.HTTP_201_CREATED)asyncdefregister(user_in:UserCreate,background_tasks:BackgroundTasks,db:AsyncSessionDepends(get_db)): 用户注册 user_serviceUserService(db)# 检查邮箱是否已存在ifawaituser_service.get_user_by_email(user_in.email):raiseHTTPException(status_codestatus.HTTP_400_BAD_REQUEST,detail邮箱已被注册)# 检查用户名是否已存在ifawaituser_service.get_user_by_username(user_in.username):raiseHTTPException(status_codestatus.HTTP_400_BAD_REQUEST,detail用户名已被使用)# 创建用户userawaituser_service.create_user(user_in)# 发送验证邮件ifsettings.EMAILS_ENABLED:background_tasks.add_task(send_verification_email,email_touser.email,usernameuser.username,user_iduser.id)returnuserrouter.post(/login,response_modelToken)asyncdeflogin(form_data:OAuth2PasswordRequestFormDepends(),db:AsyncSessionDepends(get_db)): 用户登录使用用户名/邮箱和密码 user_serviceUserService(db)# 获取用户userawaituser_service.authenticate_user(identifierform_data.username,# username字段实际可以是邮箱或用户名passwordform_data.password)ifnotuser:raiseHTTPException(status_codestatus.HTTP_401_UNAUTHORIZED,detail用户名或密码错误,headers{WWW-Authenticate:Bearer},)# 检查账户状态ifuser.status!UserStatus.ACTIVE:raiseHTTPException(status_codestatus.HTTP_400_BAD_REQUEST,detail账户未激活)# 检查是否被封禁ifuser.locked_untilanduser.locked_untildatetime.utcnow():raiseHTTPException(status_codestatus.HTTP_423_LOCKED,detail账户已被锁定请稍后重试)# 如果需要双因素认证ifuser.is_2fa_enabled:# 这里应该返回需要2FA验证的响应return{requires_2fa:True,message:需要双因素认证}# 更新登录信息awaituser_service.update_login_info(user_iduser.id,login_successTrue,ip_addressNone# 实际应该从请求中获取)# 创建会话sessionawaituser_service.create_session(user_iduser.id,user_agent,# 从请求头获取ip_addressNone)# 生成令牌token_dataawaitjwt_manager.create_token_pair(str(user.id))returntoken_datarouter.post(/login/2fa,response_modelToken)asyncdefverify_2fa(verify_data:TwoFactorVerify,current_user:UserDepends(get_current_user),db:AsyncSessionDepends(get_db)): 验证双因素认证 user_serviceUserService(db)# 验证TOTP代码ifnotuser_service.verify_totp_code(secretcurrent_user.two_factor_secret,codeverify_data.code):raiseHTTPException(status_codestatus.HTTP_401_UNAUTHORIZED,detail验证码错误)# 生成令牌token_dataawaitjwt_manager.create_token_pair(str(current_user.id))returntoken_datarouter.post(/refresh,response_modelToken)asyncdefrefresh_token(refresh_token:str,db:AsyncSessionDepends(get_db)): 刷新访问令牌 try:token_dataawaitjwt_manager.refresh_access_token(refresh_token)returntoken_dataexceptExceptionase:raiseHTTPException(status_codestatus.HTTP_401_UNAUTHORIZED,detailstr(e))router.post(/logout)asyncdeflogout(current_user:UserDepends(get_current_user),token:strDepends(get_current_user_token)): 用户登出 awaitjwt_manager.revoke_token(token)return{message:登出成功}router.post(/password/change)asyncdefchange_password(password_data:PasswordChange,current_user:UserDepends(get_current_user),db:AsyncSessionDepends(get_db)): 修改密码 user_serviceUserService(db)# 验证当前密码ifnotPasswordManager.verify_password(password_data.current_password,current_user.hashed_password):raiseHTTPException(status_codestatus.HTTP_400_BAD_REQUEST,detail当前密码错误)# 更新密码awaituser_service.update_password(user_idcurrent_user.id,new_passwordpassword_data.new_password)# 使所有会话失效awaituser_service.invalidate_all_sessions(current_user.id)return{message:密码修改成功}router.post(/password/reset/request)asyncdefrequest_password_reset(reset_request:PasswordResetRequest,background_tasks:BackgroundTasks,db:AsyncSessionDepends(get_db)): 请求密码重置 user_serviceUserService(db)# 获取用户userawaituser_service.get_user_by_email(reset_request.email)ifnotuser:# 为了防止枚举攻击即使用户不存在也返回成功return{message:如果邮箱存在重置链接已发送}# 生成重置令牌reset_tokenawaituser_service.create_password_reset_token(user.email)# 发送重置邮件ifsettings.EMAILS_ENABLED:background_tasks.add_task(send_password_reset_email,email_touser.email,usernameuser.username,tokenreset_token)return{message:如果邮箱存在重置链接已发送}router.post(/password/reset/confirm)asyncdefconfirm_password_reset(reset_data:PasswordResetConfirm,db:AsyncSessionDepends(get_db)): 确认密码重置 user_serviceUserService(db)# 验证重置令牌userawaituser_service.verify_password_reset_token(tokenreset_data.token)ifnotuser:raiseHTTPException(status_codestatus.HTTP_400_BAD_REQUEST,detail无效或过期的重置令牌)# 更新密码awaituser_service.update_password(user_iduser.id,new_passwordreset_data.new_password)# 使所有会话失效awaituser_service.invalidate_all_sessions(user.id)return{message:密码重置成功}router.post(/2fa/enable,response_modelDict[str,Any])asyncdefenable_2fa(current_user:UserDepends(get_current_user),db:AsyncSessionDepends(get_db)): 启用双因素认证 user_serviceUserService(db)# 生成TOTP密钥totp_secretPasswordManager.generate_totp_secret()# 生成二维码totppyotp.TOTP(totp_secret)provisioning_uritotp.provisioning_uri(namecurrent_user.email,issuer_namesettings.APP_NAME)# 生成二维码图片qrqrcode.QRCode(version1,error_correctionqrcode.constants.ERROR_CORRECT_L,box_size10,border4,)qr.add_data(provisioning_uri)qr.make(fitTrue)imgqr.make_image(fill_colorblack,back_colorwhite)# 转换为base64bufferedio.BytesIO()img.save(buffered,formatPNG)qr_code_base64base64.b64encode(buffered.getvalue()).decode()# 暂时保存密钥等待验证awaituser_service.set_temporary_2fa_secret(user_idcurrent_user.id,secrettotp_secret)return{secret:totp_secret,provisioning_uri:provisioning_uri,qr_code:fdata:image/png;base64,{qr_code_base64},message:扫描二维码并输入验证码以完成设置}router.post(/2fa/confirm)asyncdefconfirm_2fa(verify_data:TwoFactorEnable,current_user:UserDepends(get_current_user),db:AsyncSessionDepends(get_db)): 确认启用双因素认证 user_serviceUserService(db)# 获取临时保存的密钥temp_secretawaituser_service.get_temporary_2fa_secret(current_user.id)ifnottemp_secret:raiseHTTPException(status_codestatus.HTTP_400_BAD_REQUEST,detail请先请求启用2FA)# 验证TOTP代码ifnotuser_service.verify_totp_code(secrettemp_secret,codeverify_data.code):raiseHTTPException(status_codestatus.HTTP_400_BAD_REQUEST,detail验证码错误)# 启用2FAawaituser_service.enable_2fa(user_idcurrent_user.id,secrettemp_secret)return{message:双因素认证已启用}router.post(/2fa/disable)asyncdefdisable_2fa(current_user:UserDepends(get_current_user),db:AsyncSessionDepends(get_db)): 禁用双因素认证 user_serviceUserService(db)awaituser_service.disable_2fa(current_user.id)return{message:双因素认证已禁用}router.get(/oauth/{provider}/login)asyncdefoauth_login(provider:str,redirect_uri:str,request:Request): OAuth登录 # 创建授权URLauth_dataawaitoauth_service.create_authorization_url(providerprovider,redirect_uriredirect_uri)return{authorization_url:auth_data[authorization_url],state:auth_data[state]}router.get(/oauth/{provider}/callback)asyncdefoauth_callback(provider:str,code:str,state:str,db:AsyncSessionDepends(get_db)): OAuth回调 # 验证stateifnotawaitoauth_service.validate_state(state):raiseHTTPException(status_codestatus.HTTP_400_BAD_REQUEST,detail无效的state参数)try:# 获取state数据state_dataoauth_service.state_store.get(state)ifnotstate_data:raiseHTTPException(status_codestatus.HTTP_400_BAD_REQUEST,detail无效的state参数)redirect_uristate_data[redirect_uri]# 获取访问令牌oauth_tokenawaitoauth_service.get_access_token(providerprovider,codecode,redirect_uriredirect_uri)# 获取用户信息user_infoawaitoauth_service.get_user_info(provider,oauth_token)# 获取或创建用户oauth_user_serviceOAuthUserService(db)userawaitoauth_user_service.get_or_create_user_from_oauth(providerOAuthProvider(provider),provider_user_iduser_info[id],provider_user_emailuser_info[email],oauth_tokenoauth_token,user_infouser_info)# 生成JWT令牌token_dataawaitjwt_manager.create_token_pair(str(user.id))# 清理stateawaitoauth_service.cleanup_state(state)returntoken_dataexceptExceptionase:raiseHTTPException(status_codestatus.HTTP_400_BAD_REQUEST,detailfOAuth认证失败:{str(e)})router.get(/me,response_modelUserResponse)asyncdefget_current_user_info(current_user:UserDepends(get_current_user)): 获取当前用户信息 returncurrent_userrouter.get(/sessions)asyncdefget_user_sessions(current_user:UserDepends(get_current_user),db:AsyncSessionDepends(get_db)): 获取用户的所有会话 user_serviceUserService(db)sessionsawaituser_service.get_user_sessions(current_user.id)returnsessionsrouter.post(/sessions/{session_id}/revoke)asyncdefrevoke_session(session_id:str,current_user:UserDepends(get_current_user),db:AsyncSessionDepends(get_db)): 撤销特定会话 user_serviceUserService(db)successawaituser_service.revoke_session(user_idcurrent_user.id,session_idsession_id)ifnotsuccess:raiseHTTPException(status_codestatus.HTTP_404_NOT_FOUND,detail会话不存在)return{message:会话已撤销}router.post(/sessions/revoke-all)asyncdefrevoke_all_sessions(current_user:UserDepends(get_current_user),db:AsyncSessionDepends(get_db)): 撤销所有会话除了当前会话 user_serviceUserService(db)awaituser_service.invalidate_all_sessions(user_idcurrent_user.id,exclude_currentTrue)return{message:所有其他会话已撤销}4.2 用户管理端点# src/api/v1/endpoints/users.pyfromfastapiimportAPIRouter,Depends,HTTPException,status,Queryfromsqlalchemy.ext.asyncioimportAsyncSessionfromtypingimportList,Optionalfromsrc.databaseimportget_dbfromsrc.auth.dependenciesimport(get_current_user,require_admin,require_super_admin,PermissionChecker)fromsrc.modelsimportUser,UserRole,UserStatusfromsrc.schemasimportUserResponsefromsrc.services.user_serviceimportUserService routerAPIRouter()router.get(/,response_modelList[UserResponse])asyncdeflist_users(skip:intQuery(0,ge0),limit:intQuery(100,ge1,le1000),role:Optional[UserRole]None,status:Optional[UserStatus]None,search:Optional[str]None,current_user:UserDepends(require_admin),db:AsyncSessionDepends(get_db)): 获取用户列表需要管理员权限 user_serviceUserService(db)usersawaituser_service.get_users(skipskip,limitlimit,rolerole,statusstatus,searchsearch)returnusersrouter.get(/{user_id},response_modelUserResponse)asyncdefget_user(user_id:str,current_user:UserDepends(require_admin),db:AsyncSessionDepends(get_db)): 获取用户信息需要管理员权限 user_serviceUserService(db)userawaituser_service.get_user_by_id(user_id)ifnotuser:raiseHTTPException(status_codestatus.HTTP_404_NOT_FOUND,detail用户不存在)returnuserrouter.patch(/{user_id}/role)asyncdefupdate_user_role(user_id:str,new_role:UserRole,current_user:UserDepends(require_super_admin),db:AsyncSessionDepends(get_db)): 更新用户角色需要超级管理员权限 user_serviceUserService(db)# 不能修改自己的角色ifstr(current_user.id)user_id:raiseHTTPException(status_codestatus.HTTP_400_BAD_REQUEST,detail不能修改自己的角色)successawaituser_service.update_user_role(user_id,new_role)ifnotsuccess:raiseHTTPException(status_codestatus.HTTP_404_NOT_FOUND,detail用户不存在)return{message:用户角色更新成功}router.patch(/{user_id}/status)asyncdefupdate_user_status(user_id:str,new_status:UserStatus,current_user:UserDepends(require_admin),db:AsyncSessionDepends(get_db)): 更新用户状态需要管理员权限 user_serviceUserService(db)# 不能修改自己的状态ifstr(current_user.id)user_id:raiseHTTPException(status_codestatus.HTTP_400_BAD_REQUEST,detail不能修改自己的状态)successawaituser_service.update_user_status(user_id,new_status)ifnotsuccess:raiseHTTPException(status_codestatus.HTTP_404_NOT_FOUND,detail用户不存在)return{message:用户状态更新成功}router.delete(/{user_id})asyncdefdelete_user(user_id:str,current_user:UserDepends(require_super_admin),db:AsyncSessionDepends(get_db)): 删除用户需要超级管理员权限 user_serviceUserService(db)# 不能删除自己ifstr(current_user.id)user_id:raiseHTTPException(status_codestatus.HTTP_400_BAD_REQUEST,detail不能删除自己)successawaituser_service.delete_user(user_id)ifnotsuccess:raiseHTTPException(status_codestatus.HTTP_404_NOT_FOUND,detail用户不存在)return{message:用户删除成功}5. 高级安全特性5.1 双因素认证实现# src/services/two_factor_service.pyimportpyotpimporttimefromtypingimportOptionalfromdatetimeimportdatetime,timedeltaclassTwoFactorService:双因素认证服务staticmethoddefgenerate_secret()-str:生成TOTP密钥returnpyotp.random_base32()staticmethoddefgenerate_provisioning_uri(secret:str,email:str,issuer:str)-str:生成配置URI用于二维码totppyotp.TOTP(secret)returntotp.provisioning_uri(nameemail,issuer_nameissuer)staticmethoddefverify_code(secret:str,code:str)-bool:验证TOTP代码totppyotp.TOTP(secret)# 允许时间偏差前后30秒returntotp.verify(code,valid_window1)staticmethoddefgenerate_backup_codes(count:int10)-list:生成备份代码importsecrets codes[]for_inrange(count):# 生成8位数字代码code.join(secrets.choice(0123456789)for_inrange(8))codes.append(code)returncodesstaticmethoddefhash_backup_code(code:str)-str:哈希备份代码用于安全存储importhashlibreturnhashlib.sha256(code.encode()).hexdigest()classTwoFactorManager:双因素认证管理器def__init__(self,db):self.dbdb self.two_factor_serviceTwoFactorService()asyncdefenable_2fa(self,user_id:str,secret:str)-bool:启用双因素认证fromsrc.modelsimportUser userawaitself.db.get(User,user_id)ifnotuser:returnFalseuser.is_2fa_enabledTrueuser.two_factor_secretsecret user.two_factor_enabled_atdatetime.utcnow()# 生成备份代码backup_codesself.two_factor_service.generate_backup_codes()user.backup_codes[self.two_factor_service.hash_backup_code(code)forcodeinbackup_codes]awaitself.db.commit()# 返回备份代码只显示一次returnbackup_codesasyncdefdisable_2fa(self,user_id:str)-bool:禁用双因素认证fromsrc.modelsimportUser userawaitself.db.get(User,user_id)ifnotuser:returnFalseuser.is_2fa_enabledFalseuser.two_factor_secretNoneuser.two_factor_enabled_atNoneuser.backup_codes[]awaitself.db.commit()returnTrueasyncdefverify_2fa(self,user_id:str,code:str,use_backup_code:boolFalse)-bool:验证双因素认证fromsrc.modelsimportUser userawaitself.db.get(User,user_id)ifnotuserornotuser.is_2fa_enabled:returnFalseifuse_backup_code:# 验证备份代码code_hashself.two_factor_service.hash_backup_code(code)ifcode_hashinuser.backup_codes:# 移除已使用的备份代码user.backup_codes.remove(code_hash)awaitself.db.commit()returnTruereturnFalseelse:# 验证TOTP代码returnself.two_factor_service.verify_code(user.two_factor_secret,code)asyncdefregenerate_backup_codes(self,user_id:str)-list:重新生成备份代码fromsrc.modelsimportUser userawaitself.db.get(User,user_id)ifnotuserornotuser.is_2fa_enabled:return[]backup_codesself.two_factor_service.generate_backup_codes()user.backup_codes[self.two_factor_service.hash_backup_code(code)forcodeinbackup_codes]awaitself.db.commit()returnbackup_codes5.2 会话管理# src/services/session_service.pyfromdatetimeimportdatetime,timedeltafromtypingimportOptional,Listimportsecretsimportua_parser.user_agent_parserasua_parserclassSessionService:会话管理服务def__init__(self,db):self.dbdbasyncdefcreate_session(self,user_id:str,user_agent:Optional[str]None,ip_address:Optional[str]None,expires_in_hours:int24*7# 默认7天)-str:创建用户会话fromsrc.modelsimportUserSession# 解析User-Agentdevice_infoself.parse_user_agent(user_agent)# 生成会话令牌session_tokensecrets.token_urlsafe(32)# 创建会话sessionUserSession(user_iduser_id,session_tokensession_token,user_agentuser_agent,ip_addressip_address,device_typedevice_info.get(device_type),device_namedevice_info.get(device_name),browserdevice_info.get(browser),platformdevice_info.get(platform),expires_atdatetime.utcnow()timedelta(hoursexpires_in_hours))self.db.add(session)awaitself.db.commit()returnsession_tokendefparse_user_agent(self,user_agent:Optional[str])-dict:解析User-Agent字符串ifnotuser_agent:return{}try:parsedua_parser.Parse(user_agent)device_typedesktopdevice_name# 确定设备类型device_familyparsed[device][family]ifdevice_familySpider:device_typebotelifdevice_family!Other:device_typemobiledevice_namedevice_familyreturn{device_type:device_type,device_name:device_name,browser:parsed[user_agent][family],platform:parsed[os][family]}except:return{}asyncdefvalidate_session(self,session_token:str,update_last_activity:boolTrue)-Optional[str]:验证会话令牌返回用户IDfromsrc.modelsimportUserSession resultawaitself.db.execute(select(UserSession).where(UserSession.session_tokensession_token,UserSession.is_activeTrue,UserSession.expires_atdatetime.utcnow()))sessionresult.scalar_one_or_none()ifnotsession:returnNone# 更新最后活动时间ifupdate_last_activity:session.last_activity_atdatetime.utcnow()awaitself.db.commit()returnstr(session.user_id)asyncdefrevoke_session(self,user_id:str,session_id:str)-bool:撤销特定会话fromsrc.modelsimportUserSession resultawaitself.db.execute(select(UserSession).where(UserSession.idsession_id,UserSession.user_iduser_id,UserSession.is_activeTrue))sessionresult.scalar_one_or_none()ifnotsession:returnFalsesession.is_activeFalseawaitself.db.commit()returnTrueasyncdefrevoke_all_sessions(self,user_id:str,exclude_current:Optional[str]None)-int:撤销用户的所有会话可排除当前会话fromsrc.modelsimportUserSession queryselect(UserSession).where(UserSession.user_iduser_id,UserSession.is_activeTrue)ifexclude_current:queryquery.where(UserSession.session_token!exclude_current)resultawaitself.db.execute(query)sessionsresult.scalars().all()count0forsessioninsessions:session.is_activeFalsecount1awaitself.db.commit()returncountasyncdefget_user_sessions(self,user_id:str,active_only:boolTrue)-List[dict]:获取用户的所有会话fromsrc.modelsimportUserSession queryselect(UserSession).where(UserSession.user_iduser_id)ifactive_only:queryquery.where(UserSession.is_activeTrue,UserSession.expires_atdatetime.utcnow())resultawaitself.db.execute(query)sessionsresult.scalars().all()return[{id:str(session.id),user_agent:session.user_agent,ip_address:session.ip_address,device_type:session.device_type,device_name:session.device_name,browser:session.browser,platform:session.platform,is_active:session.is_active,last_activity_at:session.last_activity_at,created_at:session.created_at,expires_at:session.expires_at}forsessioninsessions]asyncdefcleanup_expired_sessions(self)-int:清理过期会话fromsrc.modelsimportUserSession resultawaitself.db.execute(select(UserSession).where(UserSession.expires_atdatetime.utcnow(),UserSession.is_activeTrue))sessionsresult.scalars().all()count0forsessioninsessions:session.is_activeFalsecount1awaitself.db.commit()returncount5.3 安全中间件# src/middleware/security.pyfromfastapiimportRequest,HTTPExceptionfromstarlette.middleware.baseimportBaseHTTPMiddlewarefromstarlette.responsesimportResponseimporttimefromtypingimportDict,Tupleimportredis.asyncioasredisclassSecurityMiddleware(BaseHTTPMiddleware):安全中间件def__init__(self,app,redis_client:redis.Redis,rate_limit_per_minute:int60,rate_limit_per_hour:int1000):super().__init__(app)self.redis_clientredis_client self.rate_limit_per_minuterate_limit_per_minute self.rate_limit_per_hourrate_limit_per_hourasyncdefdispatch(self,request:Request,call_next):# 安全检查awaitself.check_rate_limit(request)awaitself.check_suspicious_activity(request)# 添加安全头responseawaitcall_next(request)responseself.add_security_headers(response)returnresponseasyncdefcheck_rate_limit(self,request:Request):检查速率限制client_iprequest.client.hostifrequest.clientelseunknownpathrequest.url.path# 每分钟限制minute_keyfrate_limit:{client_ip}:{path}:minuteminute_countawaitself.redis_client.get(minute_key)ifminute_countandint(minute_count)self.rate_limit_per_minute:raiseHTTPException(status_code429,detail请求过于频繁请稍后再试,headers{Retry-After:60})# 每小时限制hour_keyfrate_limit:{client_ip}:{path}:hourhour_countawaitself.redis_client.get(hour_key)ifhour_countandint(hour_count)self.rate_limit_per_hour:raiseHTTPException(status_code429,detail每小时请求次数超限,headers{Retry-After:3600})# 更新计数器pipeself.redis_client.pipeline()pipe.incr(minute_key)pipe.expire(minute_key,60)pipe.incr(hour_key)pipe.expire(hour_key,3600)awaitpipe.execute()asyncdefcheck_suspicious_activity(self,request:Request):检查可疑活动client_iprequest.client.hostifrequest.clientelseunknown# 检查是否有可疑User-Agentuser_agentrequest.headers.get(user-agent,)suspicious_agents[curl,wget,python-requests,scan]ifany(agentinuser_agent.lower()foragentinsuspicious_agents):# 记录可疑活动awaitself.redis_client.incr(fsuspicious:{client_ip})defadd_security_headers(self,response:Response)-Response:添加安全头security_headers{X-Content-Type-Options:nosniff,X-Frame-Options:DENY,X-XSS-Protection:1; modeblock,Strict-Transport-Security:max-age31536000; includeSubDomains,Content-Security-Policy:default-src self,Referrer-Policy:strict-origin-when-cross-origin,Permissions-Policy:geolocation(), microphone(), camera(),}forheader,valueinsecurity_headers.items():response.headers[header]valuereturnresponseclassLoginAttemptTracker:登录尝试跟踪器def__init__(self,redis_client:redis.Redis):self.redis_clientredis_clientasyncdefrecord_failed_attempt(self,identifier:str)-Tuple[int,bool]: 记录失败登录尝试 Returns: (失败次数, 是否应该锁定) keyflogin_failures:{identifier}# 获取当前失败次数failuresawaitself.redis_client.get(key)failure_countint(failures)iffailureselse0# 增加失败次数failure_count1awaitself.redis_client.setex(key,900,failure_count)# 15分钟过期# 检查是否应该锁定should_lockfailure_count5returnfailure_count,should_lockasyncdefreset_failed_attempts(self,identifier:str):重置失败登录尝试keyflogin_failures:{identifier}awaitself.redis_client.delete(key)asyncdeflock_account(self,user_id:str,minutes:int15):锁定账户keyfaccount_lock:{user_id}awaitself.redis_client.setex(key,minutes*60,locked)asyncdefis_account_locked(self,user_id:str)-bool:检查账户是否被锁定keyfaccount_lock:{user_id}returnbool(awaitself.redis_client.exists(key))6. 完整的用户服务实现# src/services/user_service.pyfromsqlalchemy.ext.asyncioimportAsyncSessionfromsqlalchemyimportselect,update,delete,or_fromtypingimportOptional,Listfromdatetimeimportdatetime,timedeltaimportsecretsfromsrc.modelsimportUser,UserSession,PasswordResetToken,EmailVerificationToken,UserRole,UserStatusfromsrc.auth.passwordimportPasswordManagerfromsrc.schemasimportUserCreate,UserLoginclassUserService:用户服务def__init__(self,db:AsyncSession):self.dbdbasyncdefcreate_user(self,user_in:UserCreate)-User:创建用户hashed_passwordPasswordManager.get_password_hash(user_in.password)userUser(emailuser_in.email,usernameuser_in.username,full_nameuser_in.full_name,hashed_passwordhashed_password,statusUserStatus.INACTIVE,# 需要邮箱验证)self.db.add(user)awaitself.db.commit()awaitself.db.refresh(user)# 创建邮箱验证令牌awaitself.create_email_verification_token(user.id)returnuserasyncdefget_user_by_id(self,user_id:str)-Optional[User]:通过ID获取用户resultawaitself.db.execute(select(User).where(User.iduser_id))returnresult.scalar_one_or_none()asyncdefget_user_by_email(self,email:str)-Optional[User]:通过邮箱获取用户resultawaitself.db.execute(select(User).where(User.emailemail))returnresult.scalar_one_or_none()asyncdefget_user_by_username(self,username:str)-Optional[User]:通过用户名获取用户resultawaitself.db.execute(select(User).where(User.usernameusername))returnresult.scalar_one_or_none()asyncdefauthenticate_user(self,identifier:str,password:str)-Optional[User]:验证用户凭证# 尝试通过邮箱或用户名查找用户resultawaitself.db.execute(select(User).where(or_(User.emailidentifier,User.usernameidentifier)))userresult.scalar_one_or_none()ifnotuser:returnNone# 验证密码ifnotPasswordManager.verify_password(password,user.hashed_password):returnNonereturnuserasyncdefupdate_login_info(self,user_id:str,login_success:bool,ip_address:Optional[str]None):更新登录信息iflogin_success:# 成功登录awaitself.db.execute(update(User).where(User.iduser_id).values(last_login_atdatetime.utcnow(),last_login_ipip_address,failed_login_attempts0,locked_untilNone))else:# 失败登录awaitself.db.execute(update(User).where(User.iduser_id).values(failed_login_attemptsUser.failed_login_attempts1))# 检查是否应该锁定账户resultawaitself.db.execute(select(User.failed_login_attempts).where(User.iduser_id))attemptsresult.scalar_one()ifattempts5:lock_untildatetime.utcnow()timedelta(minutes15)awaitself.db.execute(update(User).where(User.iduser_id).values(locked_untillock_until))awaitself.db.commit()asyncdefupdate_password(self,user_id:str,new_password:str):更新用户密码hashed_passwordPasswordManager.get_password_hash(new_password)awaitself.db.execute(update(User).where(User.iduser_id).values(hashed_passwordhashed_password,password_changed_atdatetime.utcnow()))awaitself.db.commit()asyncdefcreate_password_reset_token(self,email:str)-str:创建密码重置令牌# 生成令牌tokenPasswordManager.generate_reset_token()token_hashPasswordManager.create_reset_token_hash(token)# 创建重置令牌记录reset_tokenPasswordResetToken(emailemail,token_hashtoken_hash,expires_atdatetime.utcnow()timedelta(hours24))self.db.add(reset_token)awaitself.db.commit()returntokenasyncdefverify_password_reset_token(self,token:str)-Optional[User]:验证密码重置令牌token_hashPasswordManager.create_reset_token_hash(token)# 查找有效的重置令牌resultawaitself.db.execute(select(PasswordResetToken).where(PasswordResetToken.token_hashtoken_hash,PasswordResetToken.expires_atdatetime.utcnow(),PasswordResetToken.is_usedFalse))reset_tokenresult.scalar_one_or_none()ifnotreset_token:returnNone# 标记为已使用reset_token.is_usedTruereset_token.used_atdatetime.utcnow()# 获取用户userawaitself.get_user_by_email(reset_token.email)awaitself.db.commit()returnuserasyncdefcreate_email_verification_token(self,user_id:str)-str:创建邮箱验证令牌tokensecrets.token_urlsafe(32)token_hashPasswordManager.create_reset_token_hash(token)# 创建验证令牌记录verification_tokenEmailVerificationToken(user_iduser_id,token_hashtoken_hash,expires_atdatetime.utcnow()timedelta(hours24))self.db.add(verification_token)awaitself.db.commit()returntokenasyncdefverify_email_token(self,token:str)-Optional[User]:验证邮箱验证令牌token_hashPasswordManager.create_reset_token_hash(token)# 查找有效的验证令牌resultawaitself.db.execute(select(EmailVerificationToken).where(EmailVerificationToken.token_hashtoken_hash,EmailVerificationToken.expires_atdatetime.utcnow(),EmailVerificationToken.is_usedFalse))verification_tokenresult.scalar_one_or_none()ifnotverification_token:returnNone# 标记为已使用verification_token.is_usedTrueverification_token.used_atdatetime.utcnow()# 激活用户awaitself.db.execute(update(User).where(User.idverification_token.user_id).values(is_email_verifiedTrue,statusUserStatus.ACTIVE))# 获取用户userawaitself.get_user_by_id(verification_token.user_id)awaitself.db.commit()returnuserasyncdefenable_2fa(self,user_id:str,secret:str):启用双因素认证awaitself.db.execute(update(User).where(User.iduser_id).values(is_2fa_enabledTrue,two_factor_secretsecret))awaitself.db.commit()asyncdefdisable_2fa(self,user_id:str):禁用双因素认证awaitself.db.execute(update(User).where(User.iduser_id).values(is_2fa_enabledFalse,two_factor_secretNone))awaitself.db.commit()asyncdefcreate_session(self,user_id:str,user_agent:Optional[str]None,ip_address:Optional[str]None)-str:创建用户会话fromsrc.services.session_serviceimportSessionService session_serviceSessionService(self.db)returnawaitsession_service.create_session(user_iduser_id,user_agentuser_agent,ip_addressip_address)asyncdefinvalidate_all_sessions(self,user_id:str,exclude_current:boolFalse):使用户的所有会话失效fromsrc.services.session_serviceimportSessionService session_serviceSessionService(self.db)returnawaitsession_service.revoke_all_sessions(user_iduser_id,exclude_currentexclude_current)asyncdefget_users(self,skip:int0,limit:int100,role:Optional[UserRole]None,status:Optional[UserStatus]None,search:Optional[str]None)-List[User]:获取用户列表queryselect(User)# 应用过滤器ifrole:queryquery.where(User.rolerole)ifstatus:queryquery.where(User.statusstatus)ifsearch:search_filteror_(User.email.ilike(f%{search}%),User.username.ilike(f%{search}%),User.full_name.ilike(f%{search}%))queryquery.where(search_filter)# 应用分页queryquery.offset(skip).limit(limit).order_by(User.created_at.desc())resultawaitself.db.execute(query)returnresult.scalars().all()asyncdefupdate_user_role(self,user_id:str,new_role:UserRole)-bool:更新用户角色resultawaitself.db.execute(update(User).where(User.iduser_id).values(rolenew_role))awaitself.db.commit()returnresult.rowcount0asyncdefupdate_user_status(self,user_id:str,new_status:UserStatus)-bool:更新用户状态resultawaitself.db.execute(update(User).where(User.iduser_id).values(statusnew_status))awaitself.db.commit()returnresult.rowcount0asyncdefdelete_user(self,user_id:str)-bool:删除用户软删除resultawaitself.db.execute(update(User).where(User.iduser_id).values(statusUserStatus.DELETED,emailfdeleted_{user_id}deleted.com,# 修改邮箱防止重复usernamefdeleted_{user_id}))awaitself.db.commit()returnresult.rowcount07. 测试用例# tests/test_auth.pyimportpytestfromhttpximportAsyncClientfromsqlalchemy.ext.asyncioimportAsyncSessionfromdatetimeimportdatetime,timedeltaimportjwtfromsrc.mainimportappfromsrc.modelsimportUser,UserStatusfromsrc.auth.jwt_handlerimportjwt_managerfromsrc.auth.passwordimportPasswordManagerpytest.mark.asyncioclassTestAuthentication:认证测试asyncdeftest_register_success(self,client:AsyncClient,db:AsyncSession):测试成功注册user_data{email:testexample.com,username:testuser,password:SecurePass123!,full_name:Test User}responseawaitclient.post(/api/v1/auth/register,jsonuser_data)assertresponse.status_code201dataresponse.json()assertdata[email]user_data[email]assertdata[username]user_data[username]assertdata[status]UserStatus.INACTIVE.valueassertidindataasyncdeftest_register_duplicate_email(self,client:AsyncClient,db:AsyncSession):测试重复邮箱注册# 先注册一个用户user_data{email:duplicateexample.com,username:user1,password:SecurePass123!,}awaitclient.post(/api/v1/auth/register,jsonuser_data)# 尝试用相同邮箱注册duplicate_data{email:duplicateexample.com,username:user2,password:AnotherPass123!,}responseawaitclient.post(/api/v1/auth/register,jsonduplicate_data)assertresponse.status_code400assert邮箱已被注册inresponse.json()[detail]asyncdeftest_login_success(self,client:AsyncClient,db:AsyncSession):测试成功登录# 先注册用户user_data{email:loginexample.com,username:loginuser,password:SecurePass123!,}awaitclient.post(/api/v1/auth/register,jsonuser_data)# 激活用户user_serviceUserService(db)userawaituser_service.get_user_by_email(loginexample.com)user.statusUserStatus.ACTIVE user.is_email_verifiedTrueawaitdb.commit()# 登录login_data{username:loginuser,password:SecurePass123!}responseawaitclient.post(/api/v1/auth/login,datalogin_data,headers{Content-Type:application/x-www-form-urlencoded})assertresponse.status_code200dataresponse.json()assertaccess_tokenindataassertrefresh_tokenindataassertdata[token_type]bearerasyncdeftest_login_invalid_credentials(self,client:AsyncClient,db:AsyncSession):测试无效凭证登录login_data{username:nonexistent,password:wrongpassword}responseawaitclient.post(/api/v1/auth/login,datalogin_data,headers{Content-Type:application/x-www-form-urlencoded})assertresponse.status_code401assert用户名或密码错误inresponse.json()[detail]asyncdeftest_refresh_token(self,client:AsyncClient,db:AsyncSession):测试刷新令牌# 先登录获取刷新令牌user_data{email:refreshexample.com,username:refreshuser,password:SecurePass123!,}awaitclient.post(/api/v1/auth/register,jsonuser_data)# 激活用户user_serviceUserService(db)userawaituser_service.get_user_by_email(refreshexample.com)user.statusUserStatus.ACTIVE user.is_email_verifiedTrueawaitdb.commit()# 登录获取令牌login_data{username:refreshuser,password:SecurePass123!}login_responseawaitclient.post(/api/v1/auth/login,datalogin_data,headers{Content-Type:application/x-www-form-urlencoded})refresh_tokenlogin_response.json()[refresh_token]# 刷新令牌responseawaitclient.post(/api/v1/auth/refresh,json{refresh_token:refresh_token})assertresponse.status_code200dataresponse.json()assertaccess_tokenindataassertdata[token_type]bearerasyncdeftest_protected_endpoint(self,client:AsyncClient,db:AsyncSession):测试受保护端点# 先注册并激活用户user_data{email:protectedexample.com,username:protecteduser,password:SecurePass123!,}awaitclient.post(/api/v1/auth/register,jsonuser_data)# 激活用户user_serviceUserService(db)userawaituser_service.get_user_by_email(protectedexample.com)user.statusUserStatus.ACTIVE user.is_email_verifiedTrueawaitdb.commit()# 登录获取令牌login_data{username:protecteduser,password:SecurePass123!}login_responseawaitclient.post(/api/v1/auth/login,datalogin_data,headers{Content-Type:application/x-www-form-urlencoded})access_tokenlogin_response.json()[access_token]# 访问受保护端点responseawaitclient.get(/api/v1/auth/me,headers{Authorization:fBearer{access_token}})assertresponse.status_code200dataresponse.json()assertdata[email]user_data[email]assertdata[username]user_data[username]asyncdeftest_protected_endpoint_no_token(self,client:AsyncClient):测试无令牌访问受保护端点responseawaitclient.get(/api/v1/auth/me)assertresponse.status_code401assert需要认证inresponse.json()[detail]asyncdeftest_logout(self,client:AsyncClient,db:AsyncSession):测试登出# 先注册并激活用户user_data{email:logoutexample.com,username:logoutuser,password:SecurePass123!,}awaitclient.post(/api/v1/auth/register,jsonuser_data)# 激活用户user_serviceUserService(db)userawaituser_service.get_user_by_email(logoutexample.com)user.statusUserStatus.ACTIVE user.is_email_verifiedTrueawaitdb.commit()# 登录获取令牌login_data{username:logoutuser,password:SecurePass123!}login_responseawaitclient.post(/api/v1/auth/login,datalogin_data,headers{Content-Type:application/x-www-form-urlencoded})access_tokenlogin_response.json()[access_token]# 登出responseawaitclient.post(/api/v1/auth/logout,headers{Authorization:fBearer{access_token}})assertresponse.status_code200assertresponse.json()[message]登出成功# 尝试使用已撤销的令牌访问responseawaitclient.get(/api/v1/auth/me,headers{Authorization:fBearer{access_token}})assertresponse.status_code401pytest.mark.asyncioclassTestOAuth:OAuth测试asyncdeftest_oauth_login_url(self,client:AsyncClient):测试获取OAuth登录URLresponseawaitclient.get(/api/v1/auth/oauth/google/login,params{redirect_uri:http://localhost:3000/callback})assertresponse.status_code200dataresponse.json()assertauthorization_urlindataassertstateindataassertgoogle.comindata[authorization_url]pytest.mark.asyncioclassTestPasswordReset:密码重置测试asyncdeftest_password_reset_flow(self,client:AsyncClient,db:AsyncSession):测试完整密码重置流程# 1. 注册用户user_data{email:resetexample.com,username:resetuser,password:OldPass123!,}awaitclient.post(/api/v1/auth/register,jsonuser_data)# 2. 请求密码重置reset_request{email:resetexample.com}responseawaitclient.post(/api/v1/auth/password/reset/request,jsonreset_request)assertresponse.status_code200# 3. 确认密码重置这里简化实际应该从邮件获取令牌# 获取测试令牌user_serviceUserService(db)userawaituser_service.get_user_by_email(resetexample.com)# 创建重置令牌reset_tokenawaituser_service.create_password_reset_token(user.email)# 4. 使用令牌重置密码reset_confirm{token:reset_token,new_password:NewPass456!}responseawaitclient.post(/api/v1/auth/password/reset/confirm,jsonreset_confirm)assertresponse.status_code200# 5. 使用新密码登录login_data{username:resetuser,password:NewPass456!}responseawaitclient.post(/api/v1/auth/login,datalogin_data,headers{Content-Type:application/x-www-form-urlencoded})assertresponse.status_code2008. 部署配置8.1 环境变量配置# .env.example# 应用配置APP_NAMEFastAPI AuthenticationAPP_VERSION1.0.0ENVIRONMENTproductionDEBUGfalse# 安全配置SECRET_KEYyour-secret-key-here-change-in-productionALGORITHMHS256ACCESS_TOKEN_EXPIRE_MINUTES30REFRESH_TOKEN_EXPIRE_DAYS7# 数据库DATABASE_URLpostgresqlasyncpg://user:passwordlocalhost/dbname# RedisREDIS_URLredis://localhost:6379/0# OAuth配置GOOGLE_CLIENT_IDGOOGLE_CLIENT_SECRETGITHUB_CLIENT_IDGITHUB_CLIENT_SECRETFACEBOOK_CLIENT_IDFACEBOOK_CLIENT_SECRET# 邮件配置SMTP_HOSTsmtp.gmail.comSMTP_PORT587SMTP_USERSMTP_PASSWORDEMAILS_FROM_EMAILnoreplyexample.com# 前端URL用于CORSFRONTEND_URLhttps://yourfrontend.com8.2 Docker配置# Dockerfile FROM python:3.9-slim WORKDIR /app # 安装系统依赖 RUN apt-get update apt-get install -y \ gcc \ libpq-dev \ rm -rf /var/lib/apt/lists/* # 复制依赖文件 COPY requirements.txt . # 安装Python依赖 RUN pip install --no-cache-dir -r requirements.txt # 复制应用代码 COPY . . # 创建非root用户 RUN useradd -m -u 1000 fastapi chown -R fastapi:fastapi /app USER fastapi # 运行应用 CMD [uvicorn, src.main:app, --host, 0.0.0.0, --port, 8000]9. 性能优化和安全建议9.1 性能优化令牌验证缓存验证JWT令牌时缓存结果避免重复解码数据库索引确保用户表的关键字段有索引Redis连接池使用连接池管理Redis连接异步处理对于邮件发送等耗时操作使用后台任务CDN缓存对于静态资源使用CDN缓存9.2 安全建议使用HTTPS生产环境必须使用HTTPS定期轮换密钥定期轮换JWT密钥监控异常登录监控异常的登录尝试实现审计日志记录重要的安全事件定期安全扫描定期进行安全漏洞扫描10. 总结本文详细介绍了如何在FastAPI中实现完整的JWT认证和OAuth2集成系统。通过这个系统您可以实现安全的用户认证使用JWT进行无状态认证支持多种登录方式支持密码登录和OAuth登录提供完善的安全特性包括双因素认证、会话管理、密码策略实现细粒度的权限控制基于角色的访问控制确保系统安全性通过多种安全措施保护用户数据这个系统为现代Web应用提供了一个安全、可扩展的认证授权基础架构可以根据具体需求进行扩展和定制。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询