#!/usr/local/bin/pam_rotation_venv_python3# 注: Pythonインタープリタへのパスにスペースが含まれている場合、スクリプトは実行に失敗します。# これはLinuxのシバン行の既知の制限であり、スペースを含まないパスに # Pythonインタープリタへのシンボリックリンクを作成する必要があります。# 例: sudo ln -s "/usr/local/bin/my python3.11" /usr/local/bin/pam_rotation_venv_python3import asyncioimport sysimport base64import json# 注: Okta SDKでは、以前のパスワードが指定されている場合にのみパスワードのリセットが可能です。# 処理が失敗した場合、このスクリプトでは古いパスワードに戻されませんので# スクリプトを実行する前に、古いパスワードが正しいことを確認してください。# このスクリプトは一例であり、Keeper Securityではサポートされていません。# Pythonのバージョンを表示print(f"# Python version: {sys.version}")# インストールされているパッケージを表示します。 # 以下の行のコメントを解除して環境をチェックし、必要なパッケージがインストールされていることを確かにします。# import pkg_resources# print("# \n# Print installed packages for debugging purposes:")# installed_packages = pkg_resources.working_set# installed_packages_list = sorted(["%s==%s" % (i.key, i.version) for i in installed_packages])# for m in installed_packages_list:# print(f" {m}")# Oktaモジュールをインポートtry:import oktafrom okta.client import Client as OktaClientexceptImportError:print("# Error: Okta client package is not installed. Run 'pip install okta' to install.")exit(1)# レコード内のタイトルでパスワードを特定する関数deffind_password_by_title(records,target_title):"""Search for a password by its title in the given records."""for record in records:if record['title']== target_title:return record['password']returnNone# Return None if no matching record is found# 変数を初期化okta_api_access_details_title ='ROT5: Okta API Access Details'okta_api_access_details_record =None# stdinから入力パラメータを読み取り、デコードしますfor base64_params in sys.stdin: params = json.loads(base64.b64decode(base64_params).decode())# デバッグ用に利用可能なパラメータを表示# print(f"# \n# Available params for the script:")# for key, value in params.items():# print(f"# {key}={value}")# レコードをデコードしてロード records = json.loads(base64.b64decode(params.get('records')).decode())# Okta APIアクセス詳細レコードを特定 okta_api_access_details_record =next( (record for record in records if record['title'] == okta_api_access_details_title), None)break# Okta APIクレデンシャルが見つからない場合は終了if okta_api_access_details_record isNone:print(f"# Error: No Okta API Credentials record found with title: {okta_api_access_details_title}")exit(1)# Okta設定の詳細を抽出okta_org_url = okta_api_access_details_record.get('url')okta_org_token = okta_api_access_details_record.get('password')old_password = params.get('oldPassword')new_password = params.get('newPassword')# Oktaクライアントを初期化config ={"orgUrl": okta_org_url,"token": okta_org_token}okta_client =OktaClient(config)asyncdefget_all_users():"""Fetch all Okta users.""" users, _, err =await okta_client.list_users()if err:print(f"# Error: {err}")return usersasyncdefget_okta_user_by_email(email):"""Fetch an Okta user by their email."""print(f"# Fetching all Okta users...") users =awaitget_all_users()print(f"# Searching for user with email: {email}") found_user =next((user for user in users if user.profile.email == email), None)return found_userasyncdefrotate(user_id_to_rotate,old_password,new_password):"""Rotate the password for a given Okta user.""" change_pwd_request ={"oldPassword": old_password,"newPassword": new_password,} result, _, err =await okta_client.change_password(user_id_to_rotate, change_pwd_request)if err:print(f"# Error: {err}")else:print(f"# Password changed successfully for user: {user_id_to_rotate}")asyncdefmain():"""Main function to execute the password rotation.""" user_email = params.get('user')print(f"# Fetching Okta user by email: {user_email}. This is required to get the user's Okta ID.")print(f"# If user id is present then there is no need to fetch the user by email and instead just use the user id.") user_from_okta =awaitget_okta_user_by_email(user_email)print(f"# Getting user id from Okta user") user_from_okta_id = user_from_okta.id if user_from_okta elseNoneprint(f"# Rotating password for user: {user_from_okta_id}")if user_from_okta_id:awaitrotate(user_from_okta_id, old_password, new_password)else:print("# Error: User {user_email} not found in Okta.")print("# Please ensure that the user exists in Okta and try again.")if__name__=='__main__':print("# Starting the main async function...") loop = asyncio.get_event_loop() loop.run_until_complete(main())