인증서버와 인증을 요청하는 클라이언트를 각각 구성하겠습니다.
환경 구성
인증 서버 설정
인증 서버 코드 작성(main.py): 사용자 인증과 JWT 토큰 발급을 처리하는 FastAPI 애플리케이션을 작성합니다. 이 서버는 로그인 엔드포인트(/token)를 제공하고 유효한 자격 증명을 가진 사용자에게 JWT 토큰을 발급합니다.
인증 서버 실행: 인증 서버를 8000 포트에서 실행합니다.
uvicorn uvicorn main:app --reload --port 8000
클라이언트 서버 설정
클라이언트 서버 코드 작성(main_client.py): 인증 서버와 다른 FastAPI 애플리케이션을 작성하여 클라이언트 서버로 사용합니다. 이 서버는 인증이 필요한 엔드포인트를 제공하며, 요청의 Authorization 헤더에서 JWT 토큰을 확인합니다.
클라이언트 서버 실행: 클라이언트 서버를 인증 서버와 다른 포트(8001)에서 실행합니다.
uvicorn uvicorn main_client:app --reload --port 8000
인증 서버
이전의 코드(FastAPI - 17 (보안3 - JWT))과 동일하게 작성합니다.
인증서버 main.py
from datetime import datetime, timedelta
from typing import Union
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
from typing_extensions import Annotated
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
}
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
# credentials_exception 변수를 사용하여 예외 처리 설정
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)]
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token", response_model=Token)
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me/", response_model=User)
async def read_users_me(
current_user: Annotated[User, Depends(get_current_active_user)]
):
return current_user
@app.get("/users/me/items/")
async def read_own_items(
current_user: Annotated[User, Depends(get_current_active_user)]
):
return [{"item_id": "Foo", "owner": current_user.username}]
인증서버 실행
uvicorn main:app --reload --port 8000
위의 명령어를 실행하고 아래의 URL로 접속하여 docs를 확인합니다.
http://localhost:8000/docs
아래의 화면이 출력됩니다.
api를 테스트 해 보셔도 좋습니다.
클라이언트 서버
클라이언트 서버 main_client.py
from fastapi import FastAPI, Form, HTTPException, Request, Response
from fastapi.responses import HTMLResponse
import httpx
app = FastAPI()
# 인증 서버의 URL
AUTH_SERVER_URL = "http://localhost:8000"
# HTML 로그인 폼
html_login_form = """
<!DOCTYPE html>
<html>
<head>
<title>Login</title>
</head>
<body>
<form action="/get-token" method="post">
<input type="text" name="username" placeholder="Username">
<input type="password" name="password" placeholder="Password">
<input type="submit" value="Login">
</form>
</body>
</html>
"""
# HTML 메인 페이지
html_main_page = """
<!DOCTYPE html>
<html>
<head>
<title>Main Page</title>
</head>
<body>
</body>
</html>
"""
# 로그인 폼을 반환하는 엔드포인트
@app.get("/", response_class=HTMLResponse)
async def get_login_form():
return html_login_form
# 로그인을 처리하는 엔드포인트
@app.post("/get-token", response_class=HTMLResponse)
async def login_and_get_token(username: str = Form(...), password: str = Form(...)):
url = f"{AUTH_SERVER_URL}/token"
form_data = {
"username": username,
"password": password,
}
async with httpx.AsyncClient() as client:
response = await client.post(url, data=form_data)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail="Login failed")
# JWT 토큰을 쿠키에 저장
response_data = response.json()
token = response_data["access_token"]
content = f'<p>Logged in as {username}</p>' + html_main_page
return Response(content=content, media_type="text/html", headers={"Set-Cookie": f"jwt_token={token}"})
# 사용자 정보를 가져오는 엔드포인트
@app.get("/test-users-me", response_class=HTMLResponse)
async def test_users_me(request: Request):
# 쿠키에서 JWT 토큰을 가져옴
token = request.cookies.get("jwt_token")
if not token:
raise HTTPException(status_code=401, detail="No access token available")
# 인증 서버의 /users/me/ 엔드포인트에 GET 요청을 보냄
url = f"{AUTH_SERVER_URL}/users/me/"
# Authorization 헤더에 Bearer 토큰을 추가
headers = {"Authorization": f"Bearer {token}"}
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=headers)
return Response(content=str(response.json()), media_type="text/html")
# 사용자 item 목록을 가져오는 엔드포인트
@app.get("/test-users-me-items", response_class=HTMLResponse)
async def test_users_me_items(request: Request):
token = request.cookies.get("jwt_token")
if not token:
raise HTTPException(status_code=401, detail="No access token available")
url = f"{AUTH_SERVER_URL}/users/me/items/"
headers = {"Authorization": f"Bearer {token}"}
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=headers)
return Response(content=str(response.json()), media_type="text/html")
클라이언트 서버 실행
uvicorn main_client:app --reload --port 8001
위의 명령어를 실행하고 아래의 URL로 접속하여 docs를 확인합니다.
http://localhost:8001/docs
아래의 화면이 출력됩니다.
테스트
로그인
아래의 URL로 접속하여 로그인을 합니다.
http://localhost:8001
아래의 화면이 출력되면 johndoe
, secret
을 입력합니다.
Login
버튼을 클릭하면 로그인이 완료되고 아래의 화면이 출력됩니다.
로그인이 완료되었고 jwt_token이 쿠키에 저장되었습니다.
사용자 정보 가져오기
사용자 정보를 가져오기 위해 아래의 URL로 접속합니다.
http://localhost:8001/test-users-me
아래의 화면이 출력됩니다.
사용자 item 목록 가져오기
사용자 item 목록정보를 가져오기 위해 아래의 URL로 접속합니다.
http://localhost:8001/test-users-me-items
아래의 화면이 출력됩니다.
'웹 프레임워크 > FastAPI' 카테고리의 다른 글
FastAPI - 20 (APIRouter) (0) | 2023.12.26 |
---|---|
FastAPI - 19 (DB - PostgresSQL) (0) | 2023.12.26 |
FastAPI - 17 (보안3 - JWT) (0) | 2023.12.24 |
FastAPI - 16 (보안2 - QAuth2, Bearer) (0) | 2023.12.24 |
FastAPI - 15 (보안1 - QAuth2, Bearer) (0) | 2023.12.24 |