#!/usr/local/bin/pam_rotation_venv_python3
# 注: Pythonインタープリタへのパスにスペースが含まれている場合、スクリプトは実行に失敗します。
# これはLinuxのシバン行の既知の制限であり、スペースを含まないパスに
# Pythonインタープリタへのシンボリックリンクを作成する必要があります。
# 例: sudo ln -s "/usr/local/bin/my python3.11" /usr/local/bin/pam_rotation_venv_python3
import asyncio
import sys
import base64
import json
# 注: Okta SDKでは、以前のパスワードが指定されている場合にのみパスワードのリセットが可能です。
# 処理が失敗した場合、このスクリプトでは古いパスワードに戻されませんので
# スクリプトを実行する前に、古いパスワードが正しいことを確認してください。
# このスクリプトは一例であり、Keeper Securityではサポートされていません。
# Pythonのバージョンを表示
print(f"# Python version: {sys.version}")
# インストールされているパッケージを表示します。
# 以下の行のコメントを解除して環境をチェックし、必要なパッケージがインストールされていることを確かにします。
# import pkg_resources
# print("# \n# Print installed packages for debugging purposes:")
# installed_packages = pkg_resources.working_set
# installed_packages_list = sorted(["%s==%s" % (i.key, i.version) for i in installed_packages])
# for m in installed_packages_list:
# print(f" {m}")
# Oktaモジュールをインポート
try:
import okta
from okta.client import Client as OktaClient
except ImportError:
print("# Error: Okta client package is not installed. Run 'pip install okta' to install.")
exit(1)
# レコード内のタイトルでパスワードを特定する関数
def find_password_by_title(records, target_title):
"""Search for a password by its title in the given records."""
for record in records:
if record['title'] == target_title:
return record['password']
return None # Return None if no matching record is found
# 変数を初期化
okta_api_access_details_title = 'ROT5: Okta API Access Details'
okta_api_access_details_record = None
# stdinから入力パラメータを読み取り、デコードします
for base64_params in sys.stdin:
params = json.loads(base64.b64decode(base64_params).decode())
# デバッグ用に利用可能なパラメータを表示
# print(f"# \n# Available params for the script:")
# for key, value in params.items():
# print(f"# {key}={value}")
# レコードをデコードしてロード
records = json.loads(base64.b64decode(params.get('records')).decode())
# Okta APIアクセス詳細レコードを特定
okta_api_access_details_record = next(
(record for record in records if record['title'] == okta_api_access_details_title), None)
break
# Okta APIクレデンシャルが見つからない場合は終了
if okta_api_access_details_record is None:
print(f"# Error: No Okta API Credentials record found with title: {okta_api_access_details_title}")
exit(1)
# Okta設定の詳細を抽出
okta_org_url = okta_api_access_details_record.get('url')
okta_org_token = okta_api_access_details_record.get('password')
old_password = params.get('oldPassword')
new_password = params.get('newPassword')
# Oktaクライアントを初期化
config = {
"orgUrl": okta_org_url,
"token": okta_org_token
}
okta_client = OktaClient(config)
async def get_all_users():
"""Fetch all Okta users."""
users, _, err = await okta_client.list_users()
if err:
print(f"# Error: {err}")
return users
async def get_okta_user_by_email(email):
"""Fetch an Okta user by their email."""
print(f"# Fetching all Okta users...")
users = await get_all_users()
print(f"# Searching for user with email: {email}")
found_user = next((user for user in users if user.profile.email == email), None)
return found_user
async def rotate(user_id_to_rotate, old_password, new_password):
"""Rotate the password for a given Okta user."""
change_pwd_request = {
"oldPassword": old_password,
"newPassword": new_password,
}
result, _, err = await okta_client.change_password(user_id_to_rotate, change_pwd_request)
if err:
print(f"# Error: {err}")
else:
print(f"# Password changed successfully for user: {user_id_to_rotate}")
async def main():
"""Main function to execute the password rotation."""
user_email = params.get('user')
print(f"# Fetching Okta user by email: {user_email}. This is required to get the user's Okta ID.")
print(f"# If user id is present then there is no need to fetch the user by email and instead just use the user id.")
user_from_okta = await get_okta_user_by_email(user_email)
print(f"# Getting user id from Okta user")
user_from_okta_id = user_from_okta.id if user_from_okta else None
print(f"# Rotating password for user: {user_from_okta_id}")
if user_from_okta_id:
await rotate(user_from_okta_id, old_password, new_password)
else:
print("# Error: User {user_email} not found in Okta.")
print("# Please ensure that the user exists in Okta and try again.")
if __name__ == '__main__':
print("# Starting the main async function...")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())