Administrative Credentials Record (管理者認証情報レコード): 空白のままでも構いません。
ポストローテーションPythonスクリプト
以下は、Snowflakeユーザーの認証情報をローテーションするPAMスクリプトです。
#!/usr/local/bin/pam_rotation_venv_python3
'''
Snowflakeユーザーアカウント用パスワードローテーションスクリプト
このスクリプトでは、指定されたSnowflakeユーザーのパスワードを
Snowflake Connectorパッケージを使用してローテーションします。
これにより、Snowflake環境内でユーザーパスワードの自動更新が可能になります。
注: Pythonインタープリタへのパスにスペースが含まれている場合、スクリプトの実行が失敗します。
これはLinuxのシバン行における既知の制限であり、
スペースを含まないパスにPythonインタープリタへのシンボリックリンクを作成する必要があります。
例: sudo ln -s "/usr/local/bin/my python3.7" /usr/local/bin/pam_rotation_venv_python3
'''
import asyncio
import json
import sys
import base64
'''
デバッグのためにインストールされているパッケージを表示するオプションです。必要に応じてコメントを外して使用してください。
print("# \n# Installed packages for debugging:")
installed_packages = pkg_resources.working_set
installed_packages_list = sorted(["%s==%s" % (i.key, i.version) for i in installed_packages])
for m in installed_packages_list:
print(f" {m}")
'''
# snowflakeコネクタパッケージをインポート
try:
import snowflake.connector
except ImportError:
print("# Error: The 'snowflake connector' package could not be imported. Run 'pip install snowflake-connector-python' to install it.")
exit(1)
def rotate(snowflake_account_name, snowflake_admin_user, snowflake_admin_pass, snowflake_user_name, new_password):
"""
Connects with Snowflake using the snowflake.connector module.
Rotate the password for a given Snowflake user.
Args:
- snowflake_account_name (str): The name of the Snowflake account to connect to.
- snowflake_admin_user (str): The username of the Snowflake admin account.
- snowflake_admin_pass (str): The password of the Snowflake admin account.
- snowflake_user_name (str): The name of the Snowflake user whose password needs to be rotated.
- new_password (str): The new password to be set for the Snowflake user.
Returns:
- None
"""
# snowflake.connectorモジュールを使用して、Snowflakeアカウントに接続します。
try:
conn = snowflake.connector.connect(
user=snowflake_admin_user,
password=snowflake_admin_pass,
account=snowflake_account_name
)
except Exception as E:
print(f"Unable to connect to snowflake account. Error: {E}")
exit(1)
# カーソルオブジェクトを作成
cur = conn.cursor()
# 新しいユーザーのパスワードを変更
try:
change_pass_query = f"ALTER USER {snowflake_user_name} SET PASSWORD = '{new_password}'"
cur.execute(change_pass_query)
except Exception as E:
print(f"Unable to update the password. Error: {E}")
exit(1)
# カーソルと接続を閉じる
cur.close()
conn.close()
print(f"Password successfully rotated for the given Snowflake User - {snowflake_user_name}")
def main():
"""
Main function to rotate the password for a given Snowflake User.
Reads and decodes input parameters from stdin, including the authentication record details
and the new password. Then, updates the password of the specified Snowflake user.
Args:
- None
Returns:
- None
"""
record_title = 'Snowflake Authentication Record' #これは、管理者認証情報を含むレコードのタイトルと同じである必要があります。
admin_credential_record = None
params = None
# 標準入力からパラメータを読み取り、デコードする
for base64_params in sys.stdin:
params = json.loads(base64.b64decode(base64_params).decode())
'''
# Optionally print available params for debugging. Uncomment if needed.
# print(f"# \n# Available params for the script:")
# for key, value in params.items():
# print(f"# {key}={value}")
'''
records = json.loads(base64.b64decode(params.get('records')).decode()) # PAMスクリプトセクションで「Rotation Credential」レコードとしてJSON文字列で渡されたレコードをデコードしてロードする
# タイトルを使って、管理者アカウントの詳細を含むレコードを見つける
admin_credential_record = next((record for record in records if record['title'].lower() == record_title.lower()), None)
break
if admin_credential_record is None:
print(f"# Error: No Record with the access token found. Title: {record_title}")
exit(1)
# レコードから詳細を抽出する
snowflake_account_name = admin_credential_record.get('snowflake_account_name')
snowflake_admin_user = admin_credential_record.get('login')
snowflake_admin_pass = admin_credential_record.get('password')
# パスワードをローテーションする必要があるユーザーのユーザー名
snowflake_user_name = params.get('user')
# 新しくローテーションされたパスワードを抽出
new_password = params.get('newPassword')
if not all([snowflake_account_name, snowflake_admin_user, snowflake_admin_pass]):
print("# Error: One or more required fields are missing in the authentication record.")
exit(1)
# 指定されたSnowflakeユーザーのパスワードをローテーションする
rotate(snowflake_account_name, snowflake_admin_user, snowflake_admin_pass, snowflake_user_name, new_password)
if __name__ == "__main__":
main()