BLOG

ブログ

新着記事

2020.10.07 Python

【Python】FastAPIでトークン認証を実装してみた

 

はじめまして!今年7月にFabeeeに入社しました、まっつんです。
Webエンジニアとして働き始めて3ヶ月が経ち、初案件でFastAPIを使うことになりました。

FastAPIとはモダンで高速、高性能なPythonのWebフレームワークのようです。
他のフレームワークをあまり使ったことがないので比較が難しいですが、個人的に良いと思ったのは次の4つです。

1. OpenAPIドキュメントが自動生成される
2. ドキュメント上でエンドポイントを実行できる
3. 機能がシンプルでわかりやすい
4. 公式ドキュメントが丁寧

今回はそんなFastAPIでトークン認証を実装してみたいと思います。

 

アクセストークンを使った認証


トークン認証では、トークンという文字列を使って認証を行います。
流れは次のようになります。

1. ユーザーがログインした際に、サーバー側でアクセストークンを発行する。
2. ブラウザはアクセストークンを受け取り、保持しておく。
3. ブラウザはエンドポイントにアクセスする時に、ヘッダーにアクセストークンを付与して送信する。
4. サーバーはアクセストークンを受け取って、認証を行い、成功するとレスポンスを返す。
5. アクセストークンの有効期限が切れたら、ユーザーは再びログインする。

アクセストークンはセキュリティの観点から有効期限を1時間程度の短い期間に設定します。

 

リフレッシュトークンを使ってトークンを再取得


リフレッシュトークンを使うと、ログインしっぱなしにすることできます。
流れは次のようになります。

1. ユーザーがログインした際に、サーバー側でアクセストークンとリフレッシュトークンを発行する。
2. サーバー側でユーザーとリフレッシュトークンを紐付けておく。
3. ブラウザはアクセストークンとリフレッシュトークンを受け取り、保持しておく。
4. ブラウザはアクセストークンをつけてエンドポイントにアクセスする。
5. サーバーはアクセストークンを受け取って、認証を行い、成功するとレスポンスを返す。
6. アクセストークンの有効期限が切れたら、ブラウザはトークン再取得のエンドポイントにリフレッシュトークンを送信する。
7. サーバーはリフレッシュトークンを受け取り、ユーザーに紐付いたリフレッシュトークンと一致しているか確認し、認証に成功したらトークンを再発行する。

リフレッシュトークンは有効期限を数ヶ月と長めに設定します。

 

トークンの中身


トークンはJWTを使って作成します。
JWTとは、以下のような文字列です。

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c

構成は以下のようになっています。

[ヘッダー].[ペイロード].[署名]

ヘッダーと署名はPythonのライブラリで生成するので、特に気にする必要はありません。ペイロードにはトークンに含めたいデータを入れます。
今回は「トークンの種類」、「有効期限」、「ユーザーID」とします。

 

FastAPIでトークン認証を実装


ここからは実際にコードを書いていきます。

 

環境


– MacOS Catalina 10.15.6
– python 3.7.8

 

パッケージインストール


先に必要なパッケージをインストールします。

pip3 install fastapi uvicorn python-multipart python-jose peewee

 

モデルの実装


まずは今回使用するモデルを作成しておきます。
今回はpeeweeを使ってsqliteでサクッと実装します。
ファイル名はmodels.pyとします。

from peewee import SqliteDatabase, Model, AutoField, CharField, TextField

db = SqliteDatabase('db.sqlite3')


class User(Model):
    id = AutoField(primary_key=True)
    name = CharField(100)
    password = CharField(100)
    refresh_token = TextField(null=True)

    class Meta:
        database = db


db.create_tables([User])

# ユーザーデータ挿入
User.create(name='tanaka', password='secret_tanaka')
User.create(name='kobayashi', password='secret_kobayashi')

 

認証処理実装


次に、パスワード認証、トークンの生成と認証を行う関数を作成します。
パスワードはハッシュ化等するべきですが、簡略化しています。
ファイル名はauth.pyとします。

from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from datetime import datetime, timedelta
from jose import jwt

from models import User

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


def authenticate(name: str, password: str):
    """パスワード認証し、userを返却"""
    user = User.get(name=name)
    if user.password != password:
        raise HTTPException(status_code=401, detail='パスワード不一致')
    return user


def create_tokens(user_id: int):
    """パスワード認証を行い、トークンを生成"""
    # ペイロード作成
    access_payload = {
        'token_type': 'access_token',
        'exp': datetime.utcnow() + timedelta(minutes=60),
        'user_id': user_id,
    }
    refresh_payload = {
        'token_type': 'refresh_token',
        'exp': datetime.utcnow() + timedelta(days=90),
        'user_id': user_id,
    }

    # トークン作成(本来は'SECRET_KEY123'はもっと複雑にする)
    access_token = jwt.encode(access_payload, 'SECRET_KEY123', algorithm='HS256')
    refresh_token = jwt.encode(refresh_payload, 'SECRET_KEY123', algorithm='HS256')

    # DBにリフレッシュトークンを保存
    User.update(refresh_token=refresh_token).where(User.id == user_id).execute()

    return {'access_token': access_token, 'refresh_token': refresh_token, 'token_type': 'bearer'}


def get_current_user_from_token(token: str, token_type: str):
    """tokenからユーザーを取得"""
    # トークンをデコードしてペイロードを取得。有効期限と署名は自動で検証される。
    payload = jwt.decode(token, 'SECRET_KEY123', algorithms=['HS256'])

    # トークンタイプが一致することを確認
    if payload['token_type'] != token_type:
        raise HTTPException(status_code=401, detail=f'トークンタイプ不一致')

    # DBからユーザーを取得
    user = User.get_by_id(payload['user_id'])

    # リフレッシュトークンの場合、受け取ったものとDBに保存されているものが一致するか確認
    if token_type == 'refresh_token' and user.refresh_token != token:
        print(user.refresh_token, '¥n', token)
        raise HTTPException(status_code=401, detail='リフレッシュトークン不一致')

    return user


async def get_current_user(token: str = Depends(oauth2_scheme)):
    """アクセストークンからログイン中のユーザーを取得"""
    return get_current_user_from_token(token, 'access_token')


async def get_current_user_with_refresh_token(token: str = Depends(oauth2_scheme)):
    """リフレッシュトークンからログイン中のユーザーを取得"""
    return get_current_user_from_token(token, 'refresh_token')

 

メイン処理実装


最後にメインの処理を記述していきます。
トークン発行、トークン再発行、ログインユーザー取得のエンドポイントを作成します。
ファイル名はmain.pyとします。

from fastapi import Depends, FastAPI
from fastapi.security import OAuth2PasswordRequestForm
from pydantic import BaseModel
from auth import get_current_user, get_current_user_with_refresh_token, create_tokens, authenticate

app = FastAPI()


class Token(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str

    class Config:
        orm_mode = True


class User(BaseModel):
    name: str

    class Config:
        orm_mode = True


@app.post("/token", response_model=Token)
async def login(form: OAuth2PasswordRequestForm = Depends()):
    """トークン発行"""
    user = authenticate(form.username, form.password)
    return create_tokens(user.id)


@app.get("/refresh_token/", response_model=Token)
async def refresh_token(current_user: User = Depends(get_current_user_with_refresh_token)):
    """リフレッシュトークンでトークンを再取得"""
    return create_tokens(current_user.id)


@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_user)):
    """ログイン中のユーザーを取得"""
    return current_user

 

実際に動かしてみる

以下のコマンドでサーバーを起動します。

uvicorn main:app

以下のコマンドを実行することでトークン取得(ログイン)ができます。

curl -X POST "http://localhost:8000/token" -H  "accept: application/json" -H  "Content-Type: application/x-www-form-urlencoded" -d "username=tanaka&password=secret_tanaka"

このようにaccess_tokenとrefresh_tokenが返って来るのでメモしておきます。

{"access_token":"アクセストークン","refresh_token":"リフレッシュトークン","token_type":"bearer"}%

ログインができたので、access_tokenを使ってログインユーザーを取得してみます。

curl -X GET "http://localhost:8000/users/me/" -H  "accept: application/json" -H  "Authorization: Bearer [上で取得したaccess_token]"

このように表示されれば成功です。

{"name":"tanaka"}

次に、refresh_tokenを使ってトークンを再取得してみます。

curl -X GET "http://localhost:8000/refresh_token/" -H  "accept: application/json" -H  "Authorization: Bearer [上で取得したrefresh_token]"

また、FastAPIにはドキュメント自動生成機能があり、そこからでも実行可能です。
ブラウザで以下のURLにアクセスしてみてください。
http://localhost:8000/docs

 

最後に

FastAPIを使ってトークン認証を実装してみましたが、いかがでしたでしょうか。
トークン認証でつまづいている方の助けになれば幸いです。

公式ドキュメントが充実しているので、もっと詳しく知りたい方はそちらも見てみると良いと思います。
https://fastapi.tiangolo.com/