Okta User

Rotating Okta user accounts using the Okta API

Overview

This documentation explains how to rotate Okta accounts using KeeperPAM's rotation option called "Run PAM scripts only". This is a setting in the PAM User rotation settings which tells the Gateway to skip the primary rotation method and directly execute the post-rotation script attached to the PAM User record in the vault.

Prerequisites

  • KSM Application: Ensure that the Keeper Secrets Manager (KSM) application is set up.

  • Shared Folder: A shared folder should be set up where all the records will be stored.

  • PAM Configuration: Ensure that the PAM Configuration is set up and that the Gateway is running and attached to this configuration.

  • Okta API Token: You will need an Okta API Token to interact with the Okta API.

Obtain Okta API Token

  1. Follow the steps in the official Okta documentation to generate an API token.

  2. Store this API token in a Keeper record. The record can be of any type, but for this example, we will use a "Login" type.

    • Store the API Token in the "password" field.

    • Store the Organization URL in the "Website Address" field.

  3. Name this record "Okta API Access Details" as this title will be used to fetch the record in the script later.

Okta API Details Record

Step 2: Set Up Rotation Record

Create a new PAM User record to store Okta User details whose password will be rotated.

  • Set the username to match the Okta user's email address.

  • Set the password to the current password set for the user.

Okta SDK only supports password rotation if the current password is valid. If the password is incorrect, the rotation will fail.

Step 3: Add PAM Script

  • Attach the below Python or Bash script that will perform the password rotation.

  • Add the "Additional Credential" record, which is the "Okta API Access Details" record created in Step 1.

In the example below, we'll use the bash script because the Keeper Gateway is running as a Docker container.

PAM Script

Step 4: Configure Password Rotation Settings

  • Rotation Type: Set it to "Run PAM scripts only"

  • PAM Configuration: Select the configuration for your environment

Password Rotation Settings

Environment Setup

In order for the post-rotation script to execute, the host where the Keeper Gateway is running needs to have the necessary execution environment available.

Bash Script

The Bash script below works on the latest Docker version of the Keeper Gateway or any Linux environment that has jq installed.

#!/usr/bin/env bash

# CAUTION: Okta only allows password reset where the previous password is supplied.
# If the operation fails, this script will NOT roll back to the old password.
# Please ensure that the old password is correct before running this script.
# This script is provided as an example only and is not supported by Keeper Security.

# This will be executed as the following
# history -c && echo "BASE64STRING==" | /path/to/script.sh

# Without this the script might report a success
# if something fails in the script.
set -o pipefail -e

IFS= read -r params
json=$(echo "$params" | base64 -d)

# There is no built in JSON parser.
# In order to parse JSON, a tool like jq or fx is required.
$(echo "$json" | jq -r 'keys[] as $k | "export \($k)=\(.[$k])"')

# Set your Okta API token and organization URL
recordJson=$(echo "$records" | base64 -d)
OKTA_API_RECORD="Okta API Access Details"
OKTA_API_TOKEN=$(echo "$recordJson" | jq -r ".[] | select(.title == \"$OKTA_API_RECORD\").password")
OKTA_ORG_URL=$(echo "$recordJson" | jq -r ".[] | select(.title == \"$OKTA_API_RECORD\").url")

# Set the user's ID and the new password
USER_ID="$user"
OLD_PASSWORD="$oldPassword"
NEW_PASSWORD="$newPassword"

curl -v -X POST \
-H "Accept: application/json" \
-H "Content-Type: application/json" \
-H "Authorization: SSWS "$OKTA_API_TOKEN"" \
-d '{
    "oldPassword": "'"$OLD_PASSWORD"'",
    "newPassword": "'"$NEW_PASSWORD"'"
}' "$OKTA_ORG_URL/api/v1/users/$USER_ID/credentials/change_password"

Python Script

The Python script below is well-commented and follows best practices. It imports necessary modules, initializes variables, and defines functions for various tasks like finding a password by its title, fetching all Okta users, and rotating the password for the particular user.

#!/usr/local/bin/python3

import asyncio
import sys
import base64
import json

# CAUTION: Okta SDK only allows password reset where the previous password is supplied.
# If the operation fails, this script will NOT roll back to the old password.
# Please ensure that the old password is correct before running this script.

# Display Python version
print(f"# Python version: {sys.version}")

# Display installed packages. Uncomment lines below to inspect the environment to make sure that the
# required packages are installed.

# import pkg_resources
# print("# \n# Print installed packages for debugging purposes:")
# installed_packages = pkg_resources.working_set
# installed_packages_list = sorted(["%s==%s" % (i.key, i.version) for i in installed_packages])
# for m in installed_packages_list:
#     print(f"  {m}")

# Import Okta modules
try:
    import okta
    from okta.client import Client as OktaClient
except ImportError:
    print("# Error: Okta client package is not installed. Run 'pip install okta' to install.")
    exit(1)


# Function to find a password by its title in the records
def find_password_by_title(records, target_title):
    """Search for a password by its title in the given records."""
    for record in records:
        if record['title'] == target_title:
            return record['password']
    return None  # Return None if no matching record is found


# Initialize variables
okta_api_access_details_title = 'Okta API Access Details'
okta_api_access_details_record = None

# Read and decode input parameters from stdin
for base64_params in sys.stdin:
    params = json.loads(base64.b64decode(base64_params).decode())

    # Print available params for debugging purposes
    # print(f"# \n# Available params for the script:")
    # for key, value in params.items():
    #     print(f"#     {key}={value}")

    # Decode and load records
    records = json.loads(base64.b64decode(params.get('records')).decode())

    # Find the Okta API access details record
    okta_api_access_details_record = next(
        (record for record in records if record['title'] == okta_api_access_details_title), None)
    break

# Exit if Okta API credentials are not found
if okta_api_access_details_record is None:
    print(f"# Error: No Okta API Credentials record found with title: {okta_api_access_details_title}")
    exit(1)

# Extract Okta configuration details
okta_org_url = okta_api_access_details_record.get('url')
okta_org_token = okta_api_access_details_record.get('password')
old_password = params.get('oldPassword')
new_password = params.get('newPassword')

# Initialize Okta client
config = {
    "orgUrl": okta_org_url,
    "token": okta_org_token
}
okta_client = OktaClient(config)


async def get_all_users():
    """Fetch all Okta users."""
    users, _, err = await okta_client.list_users()
    if err:
        print(f"# Error: {err}")
    return users


async def get_okta_user_by_email(email):
    """Fetch an Okta user by their email."""
    print(f"# Fetching all Okta users...")
    users = await get_all_users()

    print(f"# Searching for user with email: {email}")
    found_user = next((user for user in users if user.profile.email == email), None)

    return found_user


async def rotate(user_id_to_rotate, old_password, new_password):
    """Rotate the password for a given Okta user."""

    change_pwd_request = {
        "oldPassword": old_password,
        "newPassword": new_password,
    }

    result, _, err = await okta_client.change_password(user_id_to_rotate, change_pwd_request)
    if err:
        print(f"# Error: {err}")
    else:
        print(f"# Password changed successfully for user: {user_id_to_rotate}")


async def main():
    """Main function to execute the password rotation."""
    user_email = params.get('user')

    print(f"# Fetching Okta user by email: {user_email}. This is required to get the user's Okta ID.")
    print(f"# If user id is present then there is no need to fetch the user by email and instead just use the user id.")
    user_from_okta = await get_okta_user_by_email(user_email)

    print(f"# Getting user id from Okta user")
    user_from_okta_id = user_from_okta.id if user_from_okta else None

    print(f"# Rotating password for user: {user_from_okta_id}")
    if user_from_okta_id:
        await rotate(user_from_okta_id, old_password, new_password)
    else:
        print("# Error: User {user_email} not found in Okta.")
        print("# Please ensure that the user exists in Okta and try again.")


if __name__ == '__main__':
    print("# Starting the main async function...")
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Last updated

Was this helpful?