Oktaユーザー

Okta APIを使用したOktaユーザーアカウントのローテーション

概要

本ページでは、「NOOPモード」を使用してOktaとKeeper PAMゲートウェイでパスワードローテーションを設定する方法を解説します。Keeperのレコードに設定されたフラグで、ゲートウェイにプライマリローテーション方法をスキップしてポストローテーションスクリプトを直接実行するように指示します。

本ページの最後にPythonスクリプトの例を含めました。

要件

  1. KSMアプリケーション: シークレットマネージャー (KSM) アプリケーションがセットアップされていることを確かにします。

  2. 共有フォルダ: 共有フォルダが設定されていて、そこにすべてのレコードが保存されている必要があります。

  3. PAM設定: PAMがセットアップされていること、およびゲートウェイが実行中であり、この設定に接続されていることを確かにします。

  4. Okta APIトークン: Okta APIとのやりとりにOkta APIトークンが必要となります。

1. Okta APIトークンを取得

  1. Oktaの公式ドキュメントの手順に従ってAPIトークンを生成します。

  2. この API トークンをKeeperのレコードに保存します。レコードタイプは任意のタイプを選択できますが、この例では「ログイン」タイプを使用します。

    • APIトークンを「パスワード」フィールドに保存します。

    • 組織のURL をウェブサイトのアドレス」フィールドに保存します。

  3. このレコードに「Okta API Access Details」という名前を付けます。この名前は、後でスクリプトでレコードを取得するために使用されます。

Okta API詳細レコード

2. ローテーションレコードを設定

新しいPAMユーザーレコードを作成して、パスワードをローテーションする対象Oktaユーザーの詳細を保存します。

  • Oktaユーザーのメールアドレスと一致するようにユーザー名を設定します。

  • パスワードをユーザーに設定されている現在のパスワードに設定します。

Okta SDKでは、現在のパスワードが有効な場合にのみパスワードのローテーションを行います。パスワードが間違っている場合、ローテーションは失敗します。

3. PAMスクリプトを追加

  1. パスワードのローテーションを実行する下のPythonスクリプトを添付します。スクリプトの各行にはコメントが含まれています。

  2. 「ローテーションクレデンシャル」のレコード (手順 1で作成したOkta APIトークンと組織URLを含むレコード) を追加します。

  3. ノーオペレーション (NOOP) アトミック実行を有効にします。

    • ユーザーの詳細が保存されている現在のPAMユーザーレコードに、NOOPというラベルの新しいカスタムフィールドを作成し、その値をTrueに設定します。

4. パスワードローテーションを設定

  1. ローテーションタイプ: この例では「オンデマンド」に設定します。

  2. パスワードの複雑さ: 特別な要件がない限り、デフォルトのままにしておきます。

  3. ローテーション設定: 前に設定した PAM設定にします。

  4. 管理資格情報レコード: 空のままにしておく必要があります。

以下は、完全に情報が記入されたOktaローテーションレコードのスクリーンショットです。

Oktaローテーションレコード

5. Python環境の設定

以下は、Keeper Gatewayが実行されている環境での手順となります。

  1. Python環境に必要な依存関係を満たすようすべてインストールされていることを確認します。

  2. 仮想環境を使用する場合は、スクリプトの先頭にシバン行を追加します。

シバン行にスペースが含まれていないようにしてください。スペースが存在する場合は、スペースを含まないシンボリックリンクを作成してください。

以下は、Linuxでシンボリックリンクを作成する例となります。 sudo ln -s "/Users/john/PAM Rotation Example/.venv/bin/python3" /usr/local/bin/pam_rotation_venv_python3

Pythonスクリプト

以下のPythonスクリプトには、詳細なコメントを付けています。必要なモジュールをインポートし、変数を初期化し、タイトルによるパスワードの検索、すべてのOktaユーザーの取得、特定のユーザーのパスワードのローテーションなど、さまざまなタスクの関数を定義します。

#!/usr/local/bin/pam_rotation_venv_python3

# 注: Pythonインタープリタへのパスにスペースが含まれている場合、スクリプトは実行に失敗します。
#     これはLinuxのシバン行の既知の制限であり、スペースを含まないパスに 
#     Pythonインタープリタへのシンボリックリンクを作成する必要があります。
#     例: sudo ln -s "/usr/local/bin/my python3.11" /usr/local/bin/pam_rotation_venv_python3

import asyncio
import sys
import base64
import json

# 注: Okta SDKでは、以前のパスワードが指定されている場合にのみパスワードのリセットが可能です。
#     処理が失敗した場合、このスクリプトでは古いパスワードに戻されませんので
#     スクリプトを実行する前に、古いパスワードが正しいことを確認してください。
#     このスクリプトは一例であり、Keeper Securityではサポートされていません。


# Pythonのバージョンを表示
print(f"# Python version: {sys.version}")

# インストールされているパッケージを表示します。 
# 以下の行のコメントを解除して環境をチェックし、必要なパッケージがインストールされていることを確かにします。

# import pkg_resources
# print("# \n# Print installed packages for debugging purposes:")
# installed_packages = pkg_resources.working_set
# installed_packages_list = sorted(["%s==%s" % (i.key, i.version) for i in installed_packages])
# for m in installed_packages_list:
#     print(f"  {m}")

# Oktaモジュールをインポート
try:
    import okta
    from okta.client import Client as OktaClient
except ImportError:
    print("# Error: Okta client package is not installed. Run 'pip install okta' to install.")
    exit(1)


# レコード内のタイトルでパスワードを特定する関数
def find_password_by_title(records, target_title):
    """Search for a password by its title in the given records."""
    for record in records:
        if record['title'] == target_title:
            return record['password']
    return None  # Return None if no matching record is found


# 変数を初期化
okta_api_access_details_title = 'ROT5: Okta API Access Details'
okta_api_access_details_record = None

# stdinから入力パラメータを読み取り、デコードします
for base64_params in sys.stdin:
    params = json.loads(base64.b64decode(base64_params).decode())

    # デバッグ用に利用可能なパラメータを表示
    # print(f"# \n# Available params for the script:")
    # for key, value in params.items():
    #     print(f"#     {key}={value}")

    # レコードをデコードしてロード
    records = json.loads(base64.b64decode(params.get('records')).decode())

    # Okta APIアクセス詳細レコードを特定
    okta_api_access_details_record = next(
        (record for record in records if record['title'] == okta_api_access_details_title), None)
    break

# Okta APIクレデンシャルが見つからない場合は終了
if okta_api_access_details_record is None:
    print(f"# Error: No Okta API Credentials record found with title: {okta_api_access_details_title}")
    exit(1)

# Okta設定の詳細を抽出
okta_org_url = okta_api_access_details_record.get('url')
okta_org_token = okta_api_access_details_record.get('password')
old_password = params.get('oldPassword')
new_password = params.get('newPassword')

# Oktaクライアントを初期化
config = {
    "orgUrl": okta_org_url,
    "token": okta_org_token
}
okta_client = OktaClient(config)


async def get_all_users():
    """Fetch all Okta users."""
    users, _, err = await okta_client.list_users()
    if err:
        print(f"# Error: {err}")
    return users


async def get_okta_user_by_email(email):
    """Fetch an Okta user by their email."""
    print(f"# Fetching all Okta users...")
    users = await get_all_users()

    print(f"# Searching for user with email: {email}")
    found_user = next((user for user in users if user.profile.email == email), None)

    return found_user


async def rotate(user_id_to_rotate, old_password, new_password):
    """Rotate the password for a given Okta user."""

    change_pwd_request = {
        "oldPassword": old_password,
        "newPassword": new_password,
    }

    result, _, err = await okta_client.change_password(user_id_to_rotate, change_pwd_request)
    if err:
        print(f"# Error: {err}")
    else:
        print(f"# Password changed successfully for user: {user_id_to_rotate}")


async def main():
    """Main function to execute the password rotation."""
    user_email = params.get('user')

    print(f"# Fetching Okta user by email: {user_email}. This is required to get the user's Okta ID.")
    print(f"# If user id is present then there is no need to fetch the user by email and instead just use the user id.")
    user_from_okta = await get_okta_user_by_email(user_email)

    print(f"# Getting user id from Okta user")
    user_from_okta_id = user_from_okta.id if user_from_okta else None

    print(f"# Rotating password for user: {user_from_okta_id}")
    if user_from_okta_id:
        await rotate(user_from_okta_id, old_password, new_password)
    else:
        print("# Error: User {user_email} not found in Okta.")
        print("# Please ensure that the user exists in Okta and try again.")


if __name__ == '__main__':
    print("# Starting the main async function...")
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Bashスクリプトバージョン

以下のBashスクリプトにはコメントが含まれており、ベストプラクティスに従っています。変数の初期化や、特定のタスクを実行する関数の定義が含まれています。例として、タイトルでパスワードを検索する機能や、特定のユーザーに対してパスワードをローテーションする機能が実装されています。

#!/usr/bin/env bash

# 注:Oktaでは、以前のパスワードがある場合にしかパスワードをリセットできません。
#    この操作が失敗した場合、このスクリプトで古いパスワードには戻りません。
#    スクリプトを実行する前に、古いパスワードが正しいことを必ず確認してください。
#    このスクリプトは例であり、Keeper Securityによるサポートの対象外となります。

# 以下のように実行します。
# history -c && echo "BASE64STRING==" | /path/to/script.sh

# 以下がないと、スクリプト内で何かが失敗しても、スクリプトが成功を報告する可能性があります。
set -o pipefail -e

IFS= read -r params
json=$(echo "$params" | base64 -d)

# JSONを解析するための組み込みのパーサーはありません。
# JSONを解析するには、jqやfxのようなツールが必要です。
$(echo "$json" | jq -r 'keys[] as $k | "export \($k)=\(.[$k])"')

# Okta APIトークンと組織のURLを設定
recordJson=$(echo "$records" | base64 -d)
OKTA_API_RECORD="Okta API Access Details"
OKTA_API_TOKEN=$(echo "$recordJson" | jq -r ".[] | select(.title == \"$OKTA_API_RECORD\").password")
OKTA_ORG_URL=$(echo "$recordJson" | jq -r ".[] | select(.title == \"$OKTA_API_RECORD\").url")

# ユーザーのIDと新しいパスワードを設定
USER_ID="$user"
OLD_PASSWORD="$oldPassword"
NEW_PASSWORD="$newPassword"

curl -v -X POST \
-H "Accept: application/json" \
-H "Content-Type: application/json" \
-H "Authorization: SSWS "$OKTA_API_TOKEN"" \
-d '{
    "oldPassword": "'"$OLD_PASSWORD"'",
    "newPassword": "'"$NEW_PASSWORD"'"
}' "$OKTA_ORG_URL/api/v1/users/$USER_ID/credentials/change_password"

最終更新