Build the Token Verifier
Exit

Build the Token Verifier

Write get_current_user — the function that decodes tokens and finds users

💻

Writing code and entering commands is only available on desktop. Open this page on a larger screen to complete this chapter.

The gatekeeper function

get_current_user is the function that will guard every expense endpoint. In Lesson 3, you will add it as a parameter to each endpoint. FastAPI will run it automatically before the endpoint code executes. If verification fails at any point, the endpoint never runs — the user gets a 401 response.

The function takes two parameters that FastAPI provides automatically:

  • token: the JWT string extracted from the Authorization header by oauth2_scheme
  • session: a database session — the same dependency your endpoints already use

You tell FastAPI what to provide using the Annotated[type, Depends(function)] pattern. This is the same pattern behind SessionDep from the database course. FastAPI calls the function and passes you the result.

Three checks, one error message

The function performs three checks in sequence. If any check fails, it raises a 401 with "Invalid token":

  • Decode the token: use the secret key to verify the signature and check that the token has not expired. If the token is corrupted or expired, jwt.decode raises a JWTError.
  • Extract the email: read the "sub" field from the decoded payload. If it is missing, the token is malformed.
  • Look up the user: query the database by email. If the user does not exist — for example, the account was deleted after the token was issued — the token is no longer valid.

Instructions

Add the get_current_user function to auth.py.

  1. Define a function called get_current_user with two parameters that FastAPI fills in automatically:
    • token: Annotated[str, Depends(oauth2_scheme)] — FastAPI calls oauth2_scheme to extract the token from the Authorization header and passes the token string here
    • session: Annotated[Session, Depends(get_session)] — FastAPI opens a database session, the same way it does for your endpoints
    • The function returns a User.
  2. Start a try block. Inside it, decode the token: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) — this verifies the signature and checks that the token has not expired.
  3. Still inside the try block, extract the email: email = payload.get("sub"). If email is None, the token is missing the subject field — raise HTTPException(status_code=401, detail="Invalid token").
  4. Add an except JWTError block that raises HTTPException(status_code=401, detail="Invalid token") — this catches expired tokens, tampered signatures, and any other decoding failure.
  5. After the try/except block, look up the user in the database: user = session.exec(select(User).where(User.email == email)).first().
  6. If user is None, the account may have been deleted after the token was issued — raise HTTPException(status_code=401, detail="Invalid token"). Otherwise, return user.