Coverage for src/accounts/views/user_credential.py: 62%
79 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-09-21 16:24 +0300
« prev ^ index » next coverage.py v7.9.2, created at 2025-09-21 16:24 +0300
1"""
2This module defines API views for user registration, login, logout, password change, password reset and account deletion.
3"""
5from django.contrib.auth import get_user_model, authenticate
7from rest_framework.generics import CreateAPIView, DestroyAPIView
8from rest_framework.permissions import AllowAny
9from rest_framework.views import APIView
10from rest_framework.response import Response
11from rest_framework import status
12from rest_framework_simplejwt.tokens import RefreshToken
13from rest_framework_simplejwt.exceptions import TokenError
14from drf_spectacular.utils import extend_schema
16from src.accounts.serializers.user_credential import (
17 UserPasswordResetConfirmSerializer,
18 UserPasswordResetRequestSerializer,
19 UserRegisterSerializer,
20 UserLoginRequestSerializer,
21 UserPasswordChangeSerializer,
22)
23from src.accounts.constants import UserErrorMessages, UserSuccessMessages
25UserModel = get_user_model()
28class UserRegisterView(CreateAPIView):
29 """
30 Uses a signal to create related UserProfile and UserPhoto models.
31 """
33 queryset = UserModel.objects.all()
34 serializer_class = UserRegisterSerializer
35 permission_classes = [AllowAny]
37 def create(self, request, *args, **kwargs):
38 # Validate incoming registration data
39 serializer = self.get_serializer(data=request.data)
40 serializer.is_valid(raise_exception=True)
42 # Save the new user instance
43 user = serializer.save()
45 # Issue JWT refresh and access tokens for the new user
46 refresh = RefreshToken.for_user(user)
48 # Return tokens and user info to the frontend
50 return Response(
51 {
52 'refresh': str(refresh),
53 'access': str(refresh.access_token),
54 'email': user.email,
55 'username': user.username,
56 'id': user.pk,
57 },
58 status=status.HTTP_201_CREATED,
59 )
62class UserLoginView(APIView):
63 """
64 - Accepts email or username and password.
65 - Authenticates the user and issues JWT tokens.
66 - Allows any user to attempt login.
67 """
69 permission_classes = [AllowAny]
71 @extend_schema(request=UserLoginRequestSerializer)
72 def post(self, request, *args, **kwargs):
73 # Validate login credentials
74 serializer = UserLoginRequestSerializer(data=request.data)
75 if not serializer.is_valid():
76 return Response(
77 serializer.errors,
78 status=status.HTTP_400_BAD_REQUEST,
79 )
81 # Extract validated credentials
82 email_or_username = serializer.validated_data['email_or_username']
83 password = serializer.validated_data['password']
85 # Authenticate user (returns user instance or None)
86 user = authenticate(
87 username=email_or_username,
88 password=password,
89 )
91 if user is None:
92 # Invalid credentials
93 return Response(
94 {
95 'error': UserErrorMessages.INCORRECT_CREDENTIALS,
96 },
97 status=status.HTTP_401_UNAUTHORIZED,
98 )
100 # Issue JWT tokens
101 refresh = RefreshToken.for_user(user)
103 # Collect user permissions for frontend role-based functionality
104 permissions = []
105 if user.has_perm('products.approve_review'):
106 permissions.append('products.approve_review')
108 # Return tokens, user info, and permissions
110 return Response(
111 {
112 'refresh': str(refresh),
113 'access': str(refresh.access_token),
114 'message': 'Login successful',
115 'username': user.username,
116 'email': user.email,
117 'id': user.pk,
118 'permissions': permissions,
119 },
120 status=status.HTTP_200_OK,
121 )
124class UserLogoutView(APIView):
125 """
126 - Accepts a refresh token and blacklists it (invalidates the session).
127 - Returns a success or error message.
128 """
130 permission_classes = [AllowAny]
132 def post(self, request, *args, **kwargs):
133 try:
134 # Get refresh token from request data
135 refresh_token = request.data.get('refresh')
136 token = RefreshToken(refresh_token)
138 # Blacklist the token (requires blacklist app enabled)
139 token.blacklist()
141 return Response(
142 {
143 'message': UserSuccessMessages.LOGOUT_SUCCESS,
144 },
145 status=status.HTTP_200_OK,
146 )
148 except TokenError:
149 # Token is invalid or already expired/blacklisted
150 return Response(
151 {
152 'error': UserErrorMessages.INVALID_TOKEN,
153 },
154 status=status.HTTP_400_BAD_REQUEST,
155 )
158class UserPasswordChangeView(APIView):
159 """
160 - Only accessible to authenticated users.
161 - Validates current and new passwords.
162 - Saves the new password if validation passes.
163 """
164 @extend_schema(request=UserPasswordChangeSerializer)
165 def patch(self, request):
166 # Validate current and new passwords
167 serializer = UserPasswordChangeSerializer(
168 data=request.data,
169 context={
170 'request': request,
171 },
172 )
174 if serializer.is_valid():
175 serializer.save()
177 return Response(
178 {
179 'message': UserSuccessMessages.PASSWORD_CHANGED,
180 },
181 status=status.HTTP_200_OK,
182 )
184 return Response(
185 serializer.errors,
186 status=status.HTTP_400_BAD_REQUEST,
187 )
190class UserDeleteView(DestroyAPIView):
191 """
192 - Only accessible to authenticated users.
193 - Deletes the user instance associated with the current request.
194 """
195 @extend_schema(
196 summary='Delete user',
197 description='Deletes the logged in user'
198 )
199 def get_object(self):
200 # Return the current user instance for deletion
202 return self.request.user
205class UserPasswordResetRequestView(APIView):
206 """Send password reset email"""
208 permission_classes = [AllowAny]
210 @extend_schema(request=UserPasswordResetRequestSerializer)
211 def post(self, request):
212 serializer = UserPasswordResetRequestSerializer(data=request.data)
214 if serializer.is_valid():
215 serializer.save()
217 return Response(
218 {
219 'message': UserSuccessMessages.RESET_LINK_SENT,
220 },
221 status=status.HTTP_200_OK,
222 )
224 return Response(
225 serializer.errors,
226 status=status.HTTP_400_BAD_REQUEST,
227 )
230class UserPasswordResetConfirmView(APIView):
231 """Reset password with token"""
233 permission_classes = [AllowAny]
235 @extend_schema(request=UserPasswordResetConfirmSerializer)
236 def post(self, request):
237 serializer = UserPasswordResetConfirmSerializer(data=request.data)
238 if serializer.is_valid():
239 serializer.save()
241 return Response(
242 {
243 'message': UserSuccessMessages.PASSWORD_RESET,
244 },
245 status=status.HTTP_200_OK,
246 )
248 return Response(
249 serializer.errors,
250 status=status.HTTP_400_BAD_REQUEST,
251 )