Azureアプリケーションシークレットローテーション

Keeperシークレットマネージャーローテーションを使用してAzureアプリのシークレットを自動的にローテーション

概要

本ページでは、Keeperシークレットマネージャーのローテーションの「NOOPモード」を使用して Azureアプリケーションのシークレットをローテーションする方法について解説します。「NOOPモード」は、Keeperレコード内で設定するフラグであり、これによりゲートウェイは主なローテーション方式を飛ばして、ボルト内のPAMユーザーレコードに添付されたポストローテーションスクリプトを直接実行するように指示します。

本ページには、要件、手順、Pythonスクリプトの例が含まれています。スクリプトでは、以前のシークレットを削除して、新しいアプリケーションシークレットをKeeperに保存するなど、安全にアプリケーションシークレットをローテーションするようにします。この新しいシークレットは自動的に、既に許可されているすべてのKSMアプリケーションおよびユーザーが利用できるようになります。

要件

  1. KSMロール: Keeperユーザーが、KeeperシークレットマネージャーおよびKeeperシークレットマネージャーローテーションへのアクセスできるロールに属していることを確かにします。

  2. Keeperゲートウェイを実行するLinuxインスタンス:ゲートウェイは、任意のプライベートクラウドまたはオンプレミス環境にデプロイできます。ゲートウェイホストには、以下の 2つの依存関係とともに、サポートされているバージョンのPythonがインストールされている必要があります。

pip3 install keeper_secrets_manager_core
pip3 install azure.identity

ローテーションスクリプトのロジックフロー

1. 管理者資格情報の取得

スクリプトからは、以下の2つの方法で管理者の認証情報を取得します。

  1. ポストローテーションスクリプトに直接添付されたレコード。

  2. ローテーション用に選択されたAzure PAM構成に提供されるアクセスキー。このアクセスキーは、ポストローテーションスクリプトに添付されたレコード (1で作成したもの) にアクセスキーが見つからない場合に使用されます。

管理アプリケーションのシークレットを含む別のレコードをPAMスクリプトに添付すると、本ページに記載のプロセスを使用して、その別のレコード内で管理アプリケーションのシークレットを簡単にローテーションできるようになります。

2. シークレットのローテーションロジック

スクリプトでは以下を行います。

  1. PAMスクリプトに添付されたレコードまたは PAM構成から管理アプリケーションのシークレットを取得します。

  2. 上記の手順で見つかった管理者アプリケーションのシークレットを使用して、Microsoft Graph アクセストークンを取得します。

  3. PAMユーザーレコードで定義されたAzureアプリケーションで新しいクライアントシークレットを作成します。

  4. 定義されたAzureアプリケーションのその他の既存のシークレットをすべて削除して、上記の手順で生成されたシークレットのみを保持します。

  5. 新しいシークレットとシークレットIDを使用してKeeper PAMユーザーレコードを更新します。

PAM ユーザー レコード - フィールド要件

後ほどローテーションを設定するために、PAMユーザーレコードを作成する必要があります。以下のフィールドを作成する必要があります。

必須フィールド

フィールド名

説明

ログイン

この必須フィールドは、このスクリプトでは使用されません。このフィールドを使用して、ローテーションするAzureアプリケーションの名前などの情報を保存できます。

パスワード

この場合はダミー値になります。パスワードフィールドは自動的にローテーションし、使用されることはありませんが、必須フィールドです。

必要なカスタムフィールド:

カスタムフィールドラベル

フィールドタイプ

説明

application_object_id

テキスト

このフィールドで、Azure内のどのアプリケーションをローテーションするかを指定します。Azureポータル > アプリケーションの登録 > アプリの概要タブ > アプリケーション (クライアント) ID から、ローテーションするアプリケーションのアプリケーションオブジェクトIDを取得する必要があります。

client_secret_id

テキスト

このフィールドには、ローテーション後に新しいクライアントシークレットIDが設定されます。

client_secret

隠しフィールド

このフィールドは、ローテーション後に新しいクライアントシークレットが設定れます。

expires

テキスト

このフィールドには、ローテーション後の新しいシークレットの有効期限が設定されます。

NOOP

テキスト

このローテーションでは、ゲートウェイでローテーション スクリプトのみを実行し、組み込みのローテーション機能を使用してローテーションしないようにする必要があります。

値は以下となります。

True
Private Key Type

テキスト

NOOPを有効にする2番目のフィールド。

値は以下となります。

rsa-ssh

上記の詳細を使用してPAMユーザーレコードを手動で作成する代わりに、以下のcsvファイルをインポートすることもできます。これにより、必要に応じて修正および複製できるテンプレートレコードが作成されます。

ファイルをインポートすると、ログインレコードタイプが生成されますので、必ずPAM ーザーに変換してください。

Keeperボルトでのローテーションの設定

このスクリプトは、Azureに対して認証し、別のアプリケーションのシークレットをローテーションするために、管理者アプリケーション シークレットを必要とします。ここでは、Azure PAM構成で提供される管理者アプリケーションのシークレットを使用します。

Keeperボルトからの設定

  1. ボルトに共有フォルダを作成します。

  2. 上記のフィールドとカスタムフィールドを使用して、共有フォルダにPAMユーザーレコードを作成します。

  3. ゲートウェイがまだ存在しない場合は、Keeperボルトのシークレットマネージャータブでゲートウェイ用の新しいアプリケーションを作成します。

  4. アプリケーションに上記で作成した共有フォルダに対して編集権限があることを確かにします。

  5. Linuxボックスでゲートウェイ (アプリケーションを選択した後のゲートウェイタブ) をプロビジョニングします。Keeperボルトによって提供されるインストール コマンドを実行し、Pythonと上記の依存関係がインストールされていることを確かにします。

  6. Keeperボルトのシークレットマネージャータブで、PAM構成タブに移動します。必要に応じて、新しいPAM構成を作成します。

  7. 「環境」で、「Azure」を選択し、ゲートウェイを選択して、共有フォルダを選択し、「Entra ID」名 (Entra ID 環境の任意の名前)、管理アプリケーションの「クライアント ID」 (Azureポータルの管理アプリケーションの[概要]タブ)、「クライアントシークレット」 (Azureポータルの管理アプリケーションの[証明書とシークレット]タブ)、「サブスクリプション ID」および「テナント ID」を入力します。

  8. 前述のPAMユーザーレコードを編集します。

    • パスワードローテーション設定: 希望するスケジュールと上記で作成したPAM構成を選択します。

    • レコードにPAMスクリプトを追加します。以下のファイルを選択し、スクリプトコマンドを指定してください。

python3

Pythonスクリプト

import logging
import requests
import sys
import base64
import json
from azure.identity import ClientSecretCredential
from datetime import datetime, timedelta, timezone
from keeper_secrets_manager_core import SecretsManager
from keeper_secrets_manager_core.storage import FileKeyValueStorage

# sys.stdinは配列ではなく、添字でアクセスできません(例:sys.stdin[0])。
for base64_params in sys.stdin:
    # ゲートウェイからパラメータを取得する
    params = json.loads(base64.b64decode(base64_params).decode('utf-8'))
    break

# ポストローテーションスクリプトに添付されたレコードをKeeperから取得する
records = json.loads(base64.b64decode(params.get('records')).decode('utf-8'))

# ゲートウェイ設定を使用してKeeperシークレットマネージャーSDKを初期化する
secrets_manager = SecretsManager(config=FileKeyValueStorage('/etc/keeper-gateway/gateway-config.json'))

# ログの設定
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

GRAPH_API_URL = "https://graph.microsoft.com/v1.0"

# ClientSecretCredentialを使用してMicrosoft Graphアクセストークンを取得する関数
def get_access_token(tenant_id, client_id, client_secret):
    try:
        credential = ClientSecretCredential(tenant_id=tenant_id, client_id=client_id, client_secret=client_secret)
        token = credential.get_token("https://graph.microsoft.com/.default")
        logger.info("Authentication successful.")
        return token.token
    except Exception as e:
        logger.error(f"Authentication failed: {e}")
        raise

# 新しいクライアントシークレットを作成し、そのIDとシークレットテキストを返す関数
def create_client_secret(access_token, target_app_id, secret_expiry_days=365):
    try:
        # タイムゾーンを考慮した日時を使用して、シークレットの有効期限を設定する
        expiry_time = (datetime.now(timezone.utc) + timedelta(days=secret_expiry_days)).isoformat()

        # 新しいクライアントシークレットを作成するためのAPIリクエストボディ
        body = {
            "passwordCredential": {
                "displayName": "Client Secret Rotated by Keeper",
                "endDateTime": expiry_time
            }
        }

        headers = {
            "Authorization": f"Bearer {access_token}",
            "Content-Type": "application/json"
        }

        response = requests.post(f"{GRAPH_API_URL}/applications/{target_app_id}/addPassword", json=body, headers=headers)
        
        if response.status_code == 200:
            response_json = response.json()
            new_client_secret = response_json['secretText']
            secret_id = response_json['keyId']  # 新しいクライアントシークレットのIDを取得

            # 新しいシークレットとそのIDの両方をログに記録する
            logger.info(f"New client secret created for Application {target_app_id}. Secret ID: {secret_id}.")
            return secret_id, new_client_secret, expiry_time
        else:
            logger.error(f"Failed to create client secret: {response.status_code}, {response.text}")
            raise Exception(f"Failed to create client secret: {response.status_code}")
    except Exception as e:
        logger.error(f"An error occurred: {e}")
        raise

# 既存のクライアントシークレットを取得する関数
def get_existing_client_secrets(access_token, target_app_id):
    try:
        headers = {
            "Authorization": f"Bearer {access_token}"
        }

        response = requests.get(f"{GRAPH_API_URL}/applications/{target_app_id}", headers=headers)
        if response.status_code == 200:
            app = response.json()
            logger.info(f"Retrieved existing client secrets for Application {target_app_id}.")
            return app.get('passwordCredentials', [])
        else:
            logger.error(f"Failed to retrieve client secrets: {response.status_code}, {response.text}")
            raise Exception(f"Failed to retrieve client secrets: {response.status_code}")
    except Exception as e:
        logger.error(f"An error occurred: {e}")
        raise

# 新しく作成されたクライアントシークレットを除く、古いクライアントシークレットを削除する関数
def delete_old_secrets(access_token, target_app_id, new_client_secret_id):
    try:
        headers = {
            "Authorization": f"Bearer {access_token}",
            "Content-Type": "application/json"
        }

        existing_secrets = get_existing_client_secrets(access_token, target_app_id)
        
        # すべてのシークレットを繰り返し処理し、新しく作成されたシークレットでないものを削除する
        for secret in existing_secrets:
            if secret['keyId'] != new_client_secret_id:
                body = {
                    "keyId": secret['keyId']
                }
                response = requests.post(f"{GRAPH_API_URL}/applications/{target_app_id}/removePassword", json=body, headers=headers)

                if response.status_code == 204:
                    logger.info(f"Deleted secret with ID: {secret['keyId']} for Application {target_app_id}.")
                else:
                    logger.error(f"Failed to delete secret: {response.status_code}, {response.text}")
                    raise Exception(f"Failed to delete secret: {response.status_code}")
    except Exception as e:
        logger.error(f"An error occurred: {e}")
        raise

# メイン関数
def rotate_client_secret(tenant_id, client_id, client_secret, target_app_id, secret_expiry_days=365, delete_old_secrets_flag=False):
    try:
        # ステップ1: アクセストークンを取得する
        access_token = get_access_token(tenant_id, client_id, client_secret)
        
        # ステップ2: 新しいクライアントシークレットを作成し、そのIDを取得する
        new_client_secret_id, new_client_secret, expiry_time = create_client_secret(access_token, target_app_id, secret_expiry_days)
        if not new_client_secret:
            logger.error("Failed to create a new client secret. Exiting.")
            return None, None, None  # シークレットの作成に失敗した場合は、さらに実行を防止する
        
        logger.info(f"New client secret id: {new_client_secret_id}. Expiration date: {expiry_time}")
        
        # ステップ3: 必要に応じて古いシークレットを削除するが、新しく作成されたシークレットは除外する
        if delete_old_secrets_flag:
            delete_old_secrets(access_token, target_app_id, new_client_secret_id)

        return new_client_secret_id, new_client_secret, expiry_time
    except Exception as e:
        logger.error(f"An error occurred during the secret rotation process: {e}")
        return None, None, None  # エラーが発生した場合は、`None`を返す

# ゲートウェイからの入力と、KSMを使用した完全なPAMユーザークエリ
pam_user_to_update = secrets_manager.get_secrets([params.get('userRecordUid')])[0]

# アプリケーションのオブジェクトIDを取得してローテーションを実行する
try:
    target_app_id = pam_user_to_update.custom_field('application_object_id', single=True)
except Exception as e:
    raise RuntimeError(f"No app object id found, the script will exit after this error: {e}")

# 管理者ユーザーアカウントのアクセスキーを取得する
admin_cred_provided = False
for record in records:
    if "application_client_id" in record:
        client_id = record["application_client_id"]
        client_secret = record["client_secret"]
        tenant_id = record["tenant_id"]
        admin_cred_provided = True
        logger.info(f"Admin creds provided. Using secret_id {client_id}, from attached record UID {record['uid']}.")
        break
    elif "clientId" in record:
        client_id = record["clientId"]
        client_secret = record["clientSecret"]
        tenant_id = record["tenantId"]
        admin_cred_provided = True
        logger.info(f"Admin creds provided. Using secret_id {client_id}, from attached PAM Config record UID {record['uid']}.")
        break

# もし添付されたレコードにアプリケーションキーが提供されていない場合は、PAM設定を使用する
if admin_cred_provided == False:
    try:
        pam_config = secrets_manager.get_secrets([params.get('providerRecordUid')])[0]
        client_id = pam_config.field('clientId', single=True)
        client_secret = pam_config.field('clientSecret', single=True)
        tenant_id = pam_config.field('tenantId', single=True)
        admin_cred_provided = True
        logger.info(f"Using PAM config creds. Using secret_id {client_id}, from PAM Config UID.")
    except Exception as e:
        logger.error(f"Error retrieving admin creds from PAM config: {e}")

# 認証情報が提供されていない場合は終了する
if not admin_cred_provided:
    raise RuntimeError("No admin creds provided. The script will exit.")

# クライアントシークレットをローテーションし、成功/失敗を処理する
new_client_secret_id, new_client_secret, expiry_time = rotate_client_secret(
    tenant_id, client_id, client_secret, target_app_id, secret_expiry_days=365, delete_old_secrets_flag=True
)

if new_client_secret_id and new_client_secret:
    logger.info("\nRotation Result:")
    logger.info(f"New secret id: {new_client_secret_id}")

    # KeeperでPAMユーザーレコードを更新する
    try:
        pam_user_to_update.custom_field('client_secret_id', new_client_secret_id)
        pam_user_to_update.custom_field('client_secret', new_client_secret)
        pam_user_to_update.custom_field('expires', expiry_time)
        secrets_manager.save(pam_user_to_update)
    except Exception as e:
        logger.error(f"Error while updating PAM User record in Keeper: {e}")

最終更新