Coverage for src/accounts/views/user_credential.py: 62%

79 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-09-21 16:24 +0300

1""" 

2This module defines API views for user registration, login, logout, password change, password reset and account deletion. 

3""" 

4 

5from django.contrib.auth import get_user_model, authenticate 

6 

7from rest_framework.generics import CreateAPIView, DestroyAPIView 

8from rest_framework.permissions import AllowAny 

9from rest_framework.views import APIView 

10from rest_framework.response import Response 

11from rest_framework import status 

12from rest_framework_simplejwt.tokens import RefreshToken 

13from rest_framework_simplejwt.exceptions import TokenError 

14from drf_spectacular.utils import extend_schema 

15 

16from src.accounts.serializers.user_credential import ( 

17 UserPasswordResetConfirmSerializer, 

18 UserPasswordResetRequestSerializer, 

19 UserRegisterSerializer, 

20 UserLoginRequestSerializer, 

21 UserPasswordChangeSerializer, 

22) 

23from src.accounts.constants import UserErrorMessages, UserSuccessMessages 

24 

25UserModel = get_user_model() 

26 

27 

28class UserRegisterView(CreateAPIView): 

29 """ 

30 Uses a signal to create related UserProfile and UserPhoto models. 

31 """ 

32 

33 queryset = UserModel.objects.all() 

34 serializer_class = UserRegisterSerializer 

35 permission_classes = [AllowAny] 

36 

37 def create(self, request, *args, **kwargs): 

38 # Validate incoming registration data 

39 serializer = self.get_serializer(data=request.data) 

40 serializer.is_valid(raise_exception=True) 

41 

42 # Save the new user instance 

43 user = serializer.save() 

44 

45 # Issue JWT refresh and access tokens for the new user 

46 refresh = RefreshToken.for_user(user) 

47 

48 # Return tokens and user info to the frontend 

49 

50 return Response( 

51 { 

52 'refresh': str(refresh), 

53 'access': str(refresh.access_token), 

54 'email': user.email, 

55 'username': user.username, 

56 'id': user.pk, 

57 }, 

58 status=status.HTTP_201_CREATED, 

59 ) 

60 

61 

62class UserLoginView(APIView): 

63 """ 

64 - Accepts email or username and password. 

65 - Authenticates the user and issues JWT tokens. 

66 - Allows any user to attempt login. 

67 """ 

68 

69 permission_classes = [AllowAny] 

70 

71 @extend_schema(request=UserLoginRequestSerializer) 

72 def post(self, request, *args, **kwargs): 

73 # Validate login credentials 

74 serializer = UserLoginRequestSerializer(data=request.data) 

75 if not serializer.is_valid(): 

76 return Response( 

77 serializer.errors, 

78 status=status.HTTP_400_BAD_REQUEST, 

79 ) 

80 

81 # Extract validated credentials 

82 email_or_username = serializer.validated_data['email_or_username'] 

83 password = serializer.validated_data['password'] 

84 

85 # Authenticate user (returns user instance or None) 

86 user = authenticate( 

87 username=email_or_username, 

88 password=password, 

89 ) 

90 

91 if user is None: 

92 # Invalid credentials 

93 return Response( 

94 { 

95 'error': UserErrorMessages.INCORRECT_CREDENTIALS, 

96 }, 

97 status=status.HTTP_401_UNAUTHORIZED, 

98 ) 

99 

100 # Issue JWT tokens 

101 refresh = RefreshToken.for_user(user) 

102 

103 # Collect user permissions for frontend role-based functionality 

104 permissions = [] 

105 if user.has_perm('products.approve_review'): 

106 permissions.append('products.approve_review') 

107 

108 # Return tokens, user info, and permissions 

109 

110 return Response( 

111 { 

112 'refresh': str(refresh), 

113 'access': str(refresh.access_token), 

114 'message': 'Login successful', 

115 'username': user.username, 

116 'email': user.email, 

117 'id': user.pk, 

118 'permissions': permissions, 

119 }, 

120 status=status.HTTP_200_OK, 

121 ) 

122 

123 

124class UserLogoutView(APIView): 

125 """ 

126 - Accepts a refresh token and blacklists it (invalidates the session). 

127 - Returns a success or error message. 

128 """ 

129 

130 permission_classes = [AllowAny] 

131 

132 def post(self, request, *args, **kwargs): 

133 try: 

134 # Get refresh token from request data 

135 refresh_token = request.data.get('refresh') 

136 token = RefreshToken(refresh_token) 

137 

138 # Blacklist the token (requires blacklist app enabled) 

139 token.blacklist() 

140 

141 return Response( 

142 { 

143 'message': UserSuccessMessages.LOGOUT_SUCCESS, 

144 }, 

145 status=status.HTTP_200_OK, 

146 ) 

147 

148 except TokenError: 

149 # Token is invalid or already expired/blacklisted 

150 return Response( 

151 { 

152 'error': UserErrorMessages.INVALID_TOKEN, 

153 }, 

154 status=status.HTTP_400_BAD_REQUEST, 

155 ) 

156 

157 

158class UserPasswordChangeView(APIView): 

159 """ 

160 - Only accessible to authenticated users. 

161 - Validates current and new passwords. 

162 - Saves the new password if validation passes. 

163 """ 

164 @extend_schema(request=UserPasswordChangeSerializer) 

165 def patch(self, request): 

166 # Validate current and new passwords 

167 serializer = UserPasswordChangeSerializer( 

168 data=request.data, 

169 context={ 

170 'request': request, 

171 }, 

172 ) 

173 

174 if serializer.is_valid(): 

175 serializer.save() 

176 

177 return Response( 

178 { 

179 'message': UserSuccessMessages.PASSWORD_CHANGED, 

180 }, 

181 status=status.HTTP_200_OK, 

182 ) 

183 

184 return Response( 

185 serializer.errors, 

186 status=status.HTTP_400_BAD_REQUEST, 

187 ) 

188 

189 

190class UserDeleteView(DestroyAPIView): 

191 """ 

192 - Only accessible to authenticated users. 

193 - Deletes the user instance associated with the current request. 

194 """ 

195 @extend_schema( 

196 summary='Delete user', 

197 description='Deletes the logged in user' 

198 ) 

199 def get_object(self): 

200 # Return the current user instance for deletion 

201 

202 return self.request.user 

203 

204 

205class UserPasswordResetRequestView(APIView): 

206 """Send password reset email""" 

207 

208 permission_classes = [AllowAny] 

209 

210 @extend_schema(request=UserPasswordResetRequestSerializer) 

211 def post(self, request): 

212 serializer = UserPasswordResetRequestSerializer(data=request.data) 

213 

214 if serializer.is_valid(): 

215 serializer.save() 

216 

217 return Response( 

218 { 

219 'message': UserSuccessMessages.RESET_LINK_SENT, 

220 }, 

221 status=status.HTTP_200_OK, 

222 ) 

223 

224 return Response( 

225 serializer.errors, 

226 status=status.HTTP_400_BAD_REQUEST, 

227 ) 

228 

229 

230class UserPasswordResetConfirmView(APIView): 

231 """Reset password with token""" 

232 

233 permission_classes = [AllowAny] 

234 

235 @extend_schema(request=UserPasswordResetConfirmSerializer) 

236 def post(self, request): 

237 serializer = UserPasswordResetConfirmSerializer(data=request.data) 

238 if serializer.is_valid(): 

239 serializer.save() 

240 

241 return Response( 

242 { 

243 'message': UserSuccessMessages.PASSWORD_RESET, 

244 }, 

245 status=status.HTTP_200_OK, 

246 ) 

247 

248 return Response( 

249 serializer.errors, 

250 status=status.HTTP_400_BAD_REQUEST, 

251 )