본문 바로가기

웹 프레임워크/FastAPI

FastAPI - 17 (보안3 - JWT)

출처: https://fastapi.tiangolo.com/tutorial/security/oauth2-jwt/
아래의 내용은 공식 사이트의 내용을 제 경험과 생각을 추가하여 다시 정리한 것 입니다.

JWT란

먼저 OAuth 2.0에서 발급되는 Bearer 토큰의 형태 중 하나가 JWT입니다.

  • JWT 정의: JWT (JSON Web Tokens)는 JSON 객체를 긴 문자열로 인코딩하는 표준입니다. 이 문자열은 공백 없이 밀집되어 있습니다.
  • 예시 형태: JWT는 다음과 같은 형태를 가집니다:
    • eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
  • 비암호화: JWT는 암호화되지 않았으므로, 누구나 내용을 복구할 수 있습니다.
  • 서명 포함: JWT는 서명되어 있어, 발급자는 자신이 발급한 토큰인지를 확인할 수 있습니다.
  • 만료 기간 설정 가능: 예를 들어, 1주일 후 만료되는 토큰을 생성할 수 있습니다. 사용자가 다음 날 토큰을 가지고 돌아오면, 사용자가 여전히 시스템에 로그인된 상태임을 알 수 있습니다.
  • 만료 및 변조 감지: 토큰이 만료되면 사용자는 재로그인해야 하며, 토큰을 변조하려고 시도할 경우 서명 불일치로 감지할 수 있습니다.

JWT의 구조

  1. 헤더(Header): 토큰의 타입(주로 JWT)과 사용된 해시 알고리즘(예: HMAC SHA256 또는 RSA)을 지정합니다.
  2. { "alg": "HS256", "typ": "JWT" }
  3. 페이로드(Payload): 토큰에 담을 클레임(Claims)을 포함합니다. 클레임은 사용자에 대한 속성이나 데이터를 의미하며, 이는 인증된 사용자의 식별 정보를 포함할 수 있습니다.
  4. { "sub": "1234567890", "name": "John Doe", "admin": true }
  5. 서명(Signature): 토큰이 유효하고 무결성이 보장되었음을 증명하기 위해 사용됩니다. 서명은 헤더와 페이로드의 인코딩된 값을 합치고, 비밀 키로 해시하여 생성됩니다.

패키지 설치

JWT를 사용하기 위해 python-jose를 설치합니다. python-jose는 추가로 암호화 백엔드가 필요해서 아래와 같이 설치합니다.

pip install "python-jose[cryptography]"

비밀번호 해싱을 위해 passlib를 설치합니다. 'passlib'은 많은 보안 해시 알고리즘과 유틸리티를 함께 사용할 수 있도록 지원합니다. 권장 알고리즘은 "Bcrypt"입니다.

pip install "passlib[bcrypt]

JWT 적용

이전의 코드(FastAPI - 16 (보안2 - QAuth2, Bearer))를 이용하여 추가 합니다.

JWT 토큰 생성 및 반환

  • login_for_access_token: 로그인을 합니다.
    • authenticate_user: 사용자 인증을 합니다.
    • create_access_token: JWT 토큰을 생성합니다.
    • Bearer 타입의 access_token을 반환합니다.

JWT 토큰을 이용하여 현재 사용자 가져오기

  • read_users_me: 현재 사용자 정보를 반환합니다.
    • get_current_active_user: get_current_user를 이용 하여 활성화 된 사용자 정보를 가져옵니다.
    • get_current_user: JWT 토큰을 디코딩하고 전달 된 사용자 정보를 가져옥 get_user를 이용 하여 사용자를 인증하고 가져옵니다.
    • get_user: 사용자가 존재하는지 인증합니다.

코드

from datetime import datetime, timedelta
from typing import Union

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
# JWT 관련 패키지 import
from jose import JWTError, jwt
# 비밀번호 해싱을 위한 패키지 import
from passlib.context import CryptContext
from pydantic import BaseModel
from typing_extensions import Annotated

# JWT를 생성하고 검증할 때 사용되는 비밀 키
# openssl rand -hex 32 - 랜던한 값을 얻음
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
# JWT를 생성하고 검증할 때 사용되는 암고리즘
ALGORITHM = "HS256"
# JWT 토큰의 만료 시간 - 30분
ACCESS_TOKEN_EXPIRE_MINUTES = 30


fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
    }
}


# response_model로 사용할 Token 클래스 생성
class Token(BaseModel):
    access_token: str
    token_type: str


# 토큰을 디코딩할 때 사용할 TokenData 클래스 생성
class TokenData(BaseModel):
    username: Union[str, None] = None


# pydantic의 BaseModel을 사용
class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None


class UserInDB(User):
    hashed_password: str


### passlib 라이브러리를 사용하여 비밀번호 해싱을 위한 설정을 정의
# CryptContext: 암호 관련 작업을 위한 각종 설정을 캡슐화
# schemes=["bcrypt"]: bcrypt를 해싱 알고리즘으로 지정
# deprecated="auto": 더 이상 사용되지 않는 알고리즘이 자동으로 업데이트되도록 설정
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# 인증 헤더에서 Bearer 토큰을 추출하는 데 사용 할 OAuth2PasswordBearer 인스턴스 생성
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

app = FastAPI()


# 평문 패스워드와 해싱된 패스워드를 비교하는 함수
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


# 패스워드를 해싱하는 함수
def get_password_hash(password):
    return pwd_context.hash(password)


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


# 사용자 인증 함수: 실 업무에서는 DB에서 가져오는 경우가 많습니다.
def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


### data를 JWT로 생성하는 함수
# data: 토큰에 담을 데이터
# expires_delta: 토큰의 만료 시간을 설정
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
    to_encode = data.copy()
    # 토큰의 만료 시간이 있다면 토큰의 만료 시간을 설정
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    # 그렇지 않으면 기본 만료 시간을 15분으로 설정
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    # 토큰에 만료 시간을 추가
    to_encode.update({"exp": expire})
    # jwt.encode 함수를 사용하여 to_encode 딕셔너리를 JWT로 인코딩
    # SECRET_KEY: 토큰을 서명의 비밀 키, ALGORITHM: 암호화 알고리즘
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


#  JWT를 사용하여 현재 사용자를 확인 함수
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
    # credentials_exception 변수를 사용하여 예외 처리 설정
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        # jwt.decode 함수를 사용하여 토큰을 디코딩하여 페이로드를 가져옴
        # SECRET_KEY: 토큰을 서명의 비밀 키, ALGORITHM: 암호화 알고리즘
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        # 페이로드에서 서브젝트(sub)를 가져와서 username 변수에 할당
        # sub는 일반적으로 JWT에서 사용자를 식별하는 주체(subject)를 나타냅니다.
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        # username 값을 가지는 TokenData 클래스의 인스턴스를 생성
        token_data = TokenData(username=username)
    # 예외 발생 시 credentials_exception 예외를 발생시킴
    except JWTError:
        raise credentials_exception
    # 토큰에서 가져온 username을 가진 사용자 정보를 가져옴
    user = get_user(fake_users_db, username=token_data.username)
    # 사용자 정보가 없다면 credentials_exception 예외를 발생시킴
    if user is None:
        raise credentials_exception
    return user


# 현재 활성화 된 사용자 정보를 가져오는 함수
async def get_current_active_user(
    current_user: Annotated[User, Depends(get_current_user)]
):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


# 토큰을 생성하는 함수(토큰을 생성하기 위해 인증 - 로그인 하는 함수)
@app.post("/token", response_model=Token)
async def login_for_access_token(
    # 클라이언트가 전달한 username과 password를 OAuth2PasswordRequestForm을 사용하여 가져옴
    form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
):
    # 사용자 인증 
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    # 사용자 정보가 없다면, HTTPException을 발생시킵니다.
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    # access_token의 만료 시간을 설정
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    # 토큰을 생성하여 반환
    # 'sub' key에 username을 저장, 만료 시간을 설정
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    # 'Bearer' 타입의 access_token을 반환
    return {"access_token": access_token, "token_type": "bearer"}


# 현재 사용자 정보를 반환하는 함수
@app.get("/users/me/", response_model=User)
async def read_users_me(
    current_user: Annotated[User, Depends(get_current_active_user)]
):
    return current_user


# 현재 사용자의 아이템 정보를 반환하는 함수
@app.get("/users/me/items/")
async def read_own_items(
    current_user: Annotated[User, Depends(get_current_active_user)]
):
    return [{"item_id": "Foo", "owner": current_user.username}]

참고

SECRET_KEY는 랜덤한 값으로 설정해야 합니다.
터미널에서 아래의 명령어를 실행하여 랜덤한 값을 얻을 수 있습니다.

  openssl rand -hex 32

윈도우는 openssl이 설치되어 있지 않으므로 아래의 사이트에서 다운로드 받아 설치 해야 합니다.
https://code.google.com/archive/p/openssl-for-windows/downloads

테스트

위의 코드를 실행 하고 fastapi의 docs를 확인 합니다.

http://localhost:8000/docs

로그인 하여 토큰 받기('/token')

먼저 로그인(인증 절차)을 하기 위해 /token를 실행 해 보겠습니다.

username(johndoe)과 password(secret)를 입력하고 Execute를 클릭 합니다.

Response body를 보겠습니다.
로그인이 완료되어 access_tokentoken_type(Bearer 타입)이 출력되었습니다.

{
  "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJqb2huZG9lIiwiZXhwIjoxNzAzMzI2NzQ0fQ.l3JA4p_0u0jrwBQ5EtbYvT438QIWSvloAfNzbfdX994",
  "token_type": "bearer"
}

access_token은 이전과 다르게 JWT 토튼 형태로 해싱되어 출력됩니다.

'Authorization'를 이용하여 로그인 하기

이전의 코드(FastAPI - 16 (보안2 - QAuth2, Bearer))를 참조하여 Authorization를 이용하여 로그인을 하겠습니다.

현재 유저(user) 가져오기('/users/me/')

로그인인 된 상태에서 /users/me를 실행 해 보겠습니다.

별도의 Parameter가 없으므로 Execute를 클릭합니다.

Curl을 보겠습니다.

curl -X 'GET' \
  'http://localhost:8000/users/me/' \
  -H 'accept: application/json' \
  -H 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJqb2huZG9lIiwiZXhwIjoxNzAzMzI3MjE0fQ.jSLRpLd7dihlu1cwG4DaZ2ydhVkSUaj_73kUc6rcPGo'

-H 'Authorization: Bearer뒤의 access_token이 JWT 형태(해싱)로 요청 되었습니다.

Response body을 보겠습니다.

{
  "username": "johndoe",
  "email": "johndoe@example.com",
  "full_name": "John Doe",
  "disabled": false
}

로그인을 하거나 서버의 Session에 정보를 저장하지 않고 JWT를 이용하여 인증을 하였습니다.

현재 유저(user)의 items 가져오기('/users/me/items/')

추가로 현재 유저(user)의 items를 가져오는 api를 실행 해 보겠습니다. /users/me/items/를 실행 합니다.

Curl을 보겠습니다.

curl -X 'GET' \
  'http://localhost:8000/users/me/' \
  -H 'accept: application/json' \
  -H 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJqb2huZG9lIiwiZXhwIjoxNzAzMzI3MjE0fQ.jSLRpLd7dihlu1cwG4DaZ2ydhVkSUaj_73kUc6rcPGo'

/user/me/와 동일한 방법으로 JWT 형태(해싱)로 요청 되었습니다.

Response body을 보겠습니다.

[
  {
    "item_id": "Foo",
    "owner": "johndoe"
  }
]

동일하게 JWT 토큰 만으로 인증을 하고 현재 유저의 items를 가져왔습니다.