Oktaユーザー

Okta API を使用したOktaユーザー アカウントのローテーション

概要

本ページでは、「NOOPモード」を使用してOktaとKeeper PAMゲートウェイでパスワードローテーションを設定する方法を解説します。Keeperの記録に設定されたフラグで、ゲートウェイにプライマリローテーション方法をスキップしてポストローテーションスクリプトを直接実行するように指示します。

本ガイドには、前提条件、手順、Pythonスクリプトの例を含めました。

前提条件

  1. KSMアプリケーション: Keeper Secrets Manager (KSM) アプリケーションがセットアップされていることを確かにします。

  2. 共有フォルダ: 共有フォルダが設定されていて、そこにすべての記録が保存されている必要があります。

  3. PAM設定: PAMがセットアップされていること、およびゲートウェイが実行中であり、この設定に接続されていることを確かにします。

  4. Okta APIトークン: Okta APIとのやりとりにOkta APIトークンが必要となります。

手順 1: Okta APIトークンを取得

  1. Oktaの公式ドキュメントの手順に従ってAPIトークンを生成します。

  2. この API トークンをKeeperの記録に保存します。記録タイプは任意のタイプを選択できますが、この例では「ログイン」タイプを使用します。

    • APIトークンを「パスワード」フィールドに保存します。

    • 組織のURL をウェブサイトのアドレス」フィールドに保存します。

  3. この記録に「Okta API Access Details」という名前を付けます。この名前は、後でスクリプトで記録を取得するために使用されます。

手順 2: ローテーション記録を設定

新しいPAMユーザー記録を作成して、パスワードをローテーションする対象Oktaユーザーの詳細を保存します。

  • Oktaユーザーのメールアドレスと一致するようにユーザー名を設定します。

  • パスワードをユーザーに設定されている現在のパスワードに設定します。

Okta SDKでは、現在のパスワードが有効な場合にのみパスワードのローテーションを行います。パスワードが間違っている場合、ローテーションは失敗します。

手順 3: PAMスクリプトを追加

  1. パスワードのローテーションを実行する下のPythonスクリプトを添付します。スクリプトの各行にはコメントが含まれています。

  2. 「ローテーションクレデンシャル」の記録 (手順 1で作成したOkta APIトークンと組織URLを含む記録) を追加します。

  3. ノーオペレーション (NOOP) アトミック実行を有効にします。

    • ユーザーの詳細が保存されている現在のPAMユーザー記録に、NOOPというラベルの新しいカスタムフィールドを作成し、その値をTrueに設定します。

手順 4: パスワードローテーションを設定

  1. ローテーションタイプ: この例では「オンデマンド」に設定します。

  2. パスワードの複雑さ: 特別な要件がない限り、デフォルトのままにしておきます。

  3. ローテーション設定: 前に設定した PAM設定にします。

  4. 管理資格情報記録: 空のままにしておく必要があります。

以下は、完全に情報が記入されたOktaローテーション記録のスクリーンショットです。

Step 5: Python環境の設定

以下は、Keeper Gateway が実行されている環境での手順となります。

  1. Python環境に必要な依存関係を満たすようすべてインストールされていることを確認してください。

  2. 仮想環境を使用する場合は、スクリプトの先頭にシバン行を追加します。

シバン行にスペースが含まれていないようにしてください。スペースが存在する場合は、スペースを含まないシンボリックリンクを作成してください。

以下は、Linuxでシンボリックリンクを作成する例となります。 sudo ln -s "/Users/john/PAM Rotation Example/.venv/bin/python3" /usr/local/bin/pam_rotation_venv_python3

Pythonスクリプト

以下のPython スクリプトは、ベストプラクティスに従って詳しいコメントが付いています。必要なモジュールをインポートし、変数を初期化し、タイトルによるパスワードの検索、すべての Okta ユーザーの取得、特定のユーザーのパスワードのローテーションなど、さまざまなタスクの関数を定義します。

#!/usr/local/bin/pam_rotation_venv_python3

# 注: Pythonインタープリタへのパスにスペースが含まれている場合、スクリプトは実行に失敗します。
#     これはLinuxのシバン行の既知の制限であり、スペースを含まないパスに 
#     Pythonインタープリタへのシンボリックリンクを作成する必要があります。
#     例: sudo ln -s "/usr/local/bin/my python3.11" /usr/local/bin/pam_rotation_venv_python3

import asyncio
import sys
import base64
import json

# 注: Okta SDKでは、以前のパスワードが指定されている場合にのみパスワードのリセットが可能です。
#     処理が失敗した場合、このスクリプトでは古いパスワードに戻されませんので
#     スクリプトを実行する前に、古いパスワードが正しいことを確認してください。
#     このスクリプトは一例であり、Keeper Securityではサポートされていません。


# Pythonのバージョンを表示
print(f"# Python version: {sys.version}")

# インストールされているパッケージを表示します。 
# 以下の行のコメントを解除して環境をチェックし、必要なパッケージがインストールされていることを確かにします。

# import pkg_resources
# print("# \n# Print installed packages for debugging purposes:")
# installed_packages = pkg_resources.working_set
# installed_packages_list = sorted(["%s==%s" % (i.key, i.version) for i in installed_packages])
# for m in installed_packages_list:
#     print(f"  {m}")

# Oktaモジュールをインポート
try:
    import okta
    from okta.client import Client as OktaClient
except ImportError:
    print("# Error: Okta client package is not installed. Run 'pip install okta' to install.")
    exit(1)


# 記録内のタイトルでパスワードを特定する関数
def find_password_by_title(records, target_title):
    """Search for a password by its title in the given records."""
    for record in records:
        if record['title'] == target_title:
            return record['password']
    return None  # Return None if no matching record is found


# 変数を初期化
okta_api_access_details_title = 'ROT5: Okta API Access Details'
okta_api_access_details_record = None

# stdinから入力パラメータを読み取り、デコードします
for base64_params in sys.stdin:
    params = json.loads(base64.b64decode(base64_params).decode())

    # デバッグ用に利用可能なパラメータを表示
    # print(f"# \n# Available params for the script:")
    # for key, value in params.items():
    #     print(f"#     {key}={value}")

    # 記録をデコードしてロード
    records = json.loads(base64.b64decode(params.get('records')).decode())

    # Okta APIアクセス詳細記録を特定
    okta_api_access_details_record = next(
        (record for record in records if record['title'] == okta_api_access_details_title), None)
    break

# Okta APIクレデンシャルが見つからない場合は終了
if okta_api_access_details_record is None:
    print(f"# Error: No Okta API Credentials record found with title: {okta_api_access_details_title}")
    exit(1)

# Okta設定の詳細を抽出
okta_org_url = okta_api_access_details_record.get('url')
okta_org_token = okta_api_access_details_record.get('password')
old_password = params.get('oldPassword')
new_password = params.get('newPassword')

# Oktaクライアントを初期化
config = {
    "orgUrl": okta_org_url,
    "token": okta_org_token
}
okta_client = OktaClient(config)


async def get_all_users():
    """Fetch all Okta users."""
    users, _, err = await okta_client.list_users()
    if err:
        print(f"# Error: {err}")
    return users


async def get_okta_user_by_email(email):
    """Fetch an Okta user by their email."""
    print(f"# Fetching all Okta users...")
    users = await get_all_users()

    print(f"# Searching for user with email: {email}")
    found_user = next((user for user in users if user.profile.email == email), None)

    return found_user


async def rotate(user_id_to_rotate, old_password, new_password):
    """Rotate the password for a given Okta user."""

    change_pwd_request = {
        "oldPassword": old_password,
        "newPassword": new_password,
    }

    result, _, err = await okta_client.change_password(user_id_to_rotate, change_pwd_request)
    if err:
        print(f"# Error: {err}")
    else:
        print(f"# Password changed successfully for user: {user_id_to_rotate}")


async def main():
    """Main function to execute the password rotation."""
    user_email = params.get('user')

    print(f"# Fetching Okta user by email: {user_email}. This is required to get the user's Okta ID.")
    print(f"# If user id is present then there is no need to fetch the user by email and instead just use the user id.")
    user_from_okta = await get_okta_user_by_email(user_email)

    print(f"# Getting user id from Okta user")
    user_from_okta_id = user_from_okta.id if user_from_okta else None

    print(f"# Rotating password for user: {user_from_okta_id}")
    if user_from_okta_id:
        await rotate(user_from_okta_id, old_password, new_password)
    else:
        print("# Error: User {user_email} not found in Okta.")
        print("# Please ensure that the user exists in Okta and try again.")


if __name__ == '__main__':
    print("# Starting the main async function...")
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

最終更新