SSH PAM SSO LDAP

OK so this will be a short one, but a nerdy one! I’m currently creating a load of systems and using a SSO (Single Sign On) solution.
So I can make a load of services, webapps and servers, but the users only need to login once in one place to access everything. It also means there is a single place to update their details or password.
From an administration point it’s so much easier having one place to manage users and their groups rather than have to log in to loads of different systems, setting up accounts, adding to groups in each different system with different settings and places to learn.

The stack so far:

  • Keycloak – The single sign on solution
  • 389ds – Open source LDAP server
  • Pfsense – The router/switch/firewall
    • OpenVPN – Pfsense VPN plugin which allows auth via LDAP
    • HA Proxy – Pfsense reverse proxy for web applications
  • JumpBox – A Debian server running SSH that we want people to be able to access via SSO

Users can login to services (websites, VPN, etc.) which in turn use keycloak using a variety of methods (typically oidc) and keycloak updates 389ds with the users and groups. OpenVPN only supports LDAP authentication and that is why the system has been set up this way. 389ds’s implementation isn’t posix (or seemingly a standard way of doing things) so any of the typical Linux LDAP authentication methods just dont work (sssd, nslcd or nss_ldap) or at least they didn’t for me. Thats when I discovered PAM!

PAM (Pluggable Authentication Modules) is a system in Linux that controls how users log in. You can configure PAM to choose which checks to run and in what order, making it easy to change how login works without changing the programs themselves.

Now thats sounds simple and exactly what I want. It kind of is, however took a few days to really understand and get working. The plan was: when someone tries to log in via SSH, instead of the SSH server dealing with the authentication, it would be sent to my script. This script would then confirm the details via LDAP and act accordingly.

I created the script, got everything set up. And it wouldn’t work. I tried all sorts, before eventually logging EVERYTHING! Instead of the password, my script was getting: “\x08\n\r\x7fINCORRECT”. It turns out PAM does have to do some basic preliminary checks, one of these being “does this user exist on the system” – If the user doesn’t exist then send “INCORRECT”. Apparently it’s possible to get sssd or others to check this via LDAP first, but I couldn’t get that working. In light of this, I made the script first check if the user exists, if not, we know it’s going to get “incorrect” – so create the user account on the local system and disconnect the authentication attempt.

This means the first time a user tries to connect to SSH, if they have an account on the LDAP server it just drops the connection, doesn’t ask for a password or anything. The next time they try to log in the script will get their password, we can check they belong to the correct group for access and everything will work as we want.
Side note: I created the group “JumpBox” and if users are a member of that group then they can access the server.

So I guess it’s time to show you how to create this, it’s super simple and only requires 3 files:

/etc/pam.d/sshd

This is the PAM config specifically for SSH which tells it the order and types of authentication to use.
This configuration allows the user to first try to login with their SSO account, but if that fails it then will try a local account.

auth    sufficient      /lib/x86_64-linux-gnu/security/pam_python.so /opt/PAM/pam_ldap_user.py
auth    required      pam_unix.so try_first_pass
account required      pam_unix.so
session required      pam_unix.so

/etc/ssh/sshd_config

The most important one here is “KbdInteractiveAuthentication yes” without this is wont work.

AuthenticationMethods keyboard-interactive:pam
UsePAM yes
UseDNS no
PasswordAuthentication no
KbdInteractiveAuthentication yes
ChallengeResponseAuthentication yes
UseLogin no

/opt/PAM/pam_ldap_user.py

Finally the main script that does all of the PAM LDAP SSO magic

#!/usr/bin/env python3
import ldap
import pwd
import grp
import subprocess
import logging
import os
import sys
import time
from functools import wraps
 
# === Debug Setup ===
sys.stderr = open('/var/log/pam_ldap_user.log', 'a')
 
# === Config ===
CONFIG = {
    "EMAIL_DOMAIN": "company.com",
    "LDAP_SERVER": "ldap://[LDAP server IP]",
    "BASE_DN": "dc=company,dc=com",
    "BIND_DN": "cn=[username]",
    "BIND_PASSWORD": "[password]",
    "GROUP_ATTRIBUTE": "memberOf",
    "JUMPALLOWED_GROUP": "JumpBox",
    "USER_FILTER_TEMPLATE": "(mail={email})",
    "LOG_FILE": "/var/log/pam_ldap_user.log",
    "ALLOWED_SHELL": "/bin/bash",
    "LDAP_TIMEOUT": 5
}
 
# === Logging Setup ===
log_format = '%(asctime)s %(levelname)s: %(message)s'
logging.basicConfig(
    filename=CONFIG["LOG_FILE"],
    level=logging.DEBUG,
    format=log_format
)
 
def log_entry_exit(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        log(f"Entering {func.__name__}")
        try:
            result = func(*args, **kwargs)
            log(f"Exiting {func.__name__}")
            return result
        except Exception as e:
            log(f"Error in {func.__name__}: {str(e)}", logging.ERROR)
            raise
    return wrapper
 
def log(msg, level=logging.INFO):
    logging.log(level, msg)
    print(msg, file=sys.stderr)
 
def handle_password(password_input):
    try:
        if isinstance(password_input, str):
            return password_input.strip()
        if isinstance(password_input, bytes):
            try:
                return password_input.decode('utf-8').strip('\x00')
            except UnicodeDecodeError:
                cleaned = password_input.split(b'\x00')[0]
                return cleaned.decode('latin1').strip()
        return str(password_input).strip()
    except Exception as e:
        log(f"Password handling error: {e}")
        return ""
 
# === LDAP Functions ===
@log_entry_exit
def initialize_ldap():
    conn = ldap.initialize(CONFIG["LDAP_SERVER"])
    conn.set_option(ldap.OPT_REFERRALS, 0)
    conn.set_option(ldap.OPT_NETWORK_TIMEOUT, CONFIG["LDAP_TIMEOUT"])
    conn.protocol_version = ldap.VERSION3
    return conn
 
@log_entry_exit
def ldap_authenticate_credentials(email, password):
    try:
        conn = initialize_ldap()
        log(f"Service bind with DN: {CONFIG['BIND_DN']}")
        conn.simple_bind_s(CONFIG["BIND_DN"], CONFIG["BIND_PASSWORD"])
         
        search_filter = CONFIG["USER_FILTER_TEMPLATE"].format(email=email)
        log(f"Searching with filter: {search_filter}")
        result = conn.search_s(
            CONFIG["BASE_DN"],
            ldap.SCOPE_SUBTREE,
            search_filter,
            []
        )
 
        if not result:
            log("User not found", logging.WARNING)
            return False
 
        user_dn = result[0][0]
        log(f"Found user DN: {user_dn}")
 
        user_conn = initialize_ldap()
        user_conn.simple_bind_s(user_dn, password)
        log("User bind successful")
        return True
 
    except ldap.INVALID_CREDENTIALS:
        log("Invalid credentials", logging.ERROR)
        return False
    except ldap.LDAPError as e:
        log(f"LDAP error: {e}", logging.ERROR)
        return False
    finally:
        try:
            conn.unbind()
        except:
            pass
        try:
            user_conn.unbind()
        except:
            pass
 
 
@log_entry_exit
def ldap_check_group(email):
    try:
        conn = initialize_ldap()
        log(f"Service bind with DN: {CONFIG['BIND_DN']}")
        conn.simple_bind_s(CONFIG["BIND_DN"], CONFIG["BIND_PASSWORD"])
         
        search_filter = CONFIG["USER_FILTER_TEMPLATE"].format(email=email)
        log(f"Searching with filter: {search_filter}")
        result = conn.search_s(
            CONFIG["BASE_DN"],
            ldap.SCOPE_SUBTREE,
            search_filter,
            [CONFIG["GROUP_ATTRIBUTE"]]
        )
 
        if not result:
            log("User not found", logging.WARNING)
            return False
 
        user_attrs = result[0][1]
        groups = user_attrs.get(CONFIG["GROUP_ATTRIBUTE"], [])
        required_group = f"cn={CONFIG['JUMPALLOWED_GROUP']},ou=Groups,{CONFIG['BASE_DN']}"
 
        if isinstance(groups, bytes):
            groups = [groups]
 
        groups = [g.decode() if isinstance(g, bytes) else g for g in groups]
 
        if required_group not in groups:
            log(f"User not in required group: {required_group}", logging.WARNING)
            username = email.split('@')[0]
            if local_user_exists(username):
                delete_local_user(username)
            return False
 
        log(f"User in group: {required_group}")
        return True
 
    except ldap.LDAPError as e:
        log(f"LDAP error: {e}", logging.ERROR)
        return False
    finally:
        try:
            conn.unbind()
        except:
            pass  
 
# === Local User Management ===
@log_entry_exit
def local_user_exists(username):
    try:
        pwd.getpwnam(username)
        return True
    except KeyError:
        return False
 
@log_entry_exit
def create_local_user(username):
    try:
        subprocess.run([
            '/usr/sbin/useradd',
            '-m',
            '-s', CONFIG["ALLOWED_SHELL"],
            '-G', CONFIG["JUMPALLOWED_GROUP"],
            username
        ], check=True, capture_output=True, text=True)
        log(f"Created user {username}")
 
    except subprocess.CalledProcessError as e:
        log(f"User creation failed: {e.stderr}", logging.ERROR)
        raise
 
@log_entry_exit
def delete_local_user(username):
    try:
        subprocess.run([
            '/usr/sbin/userdel',
            '-r',
            username
        ], check=True, capture_output=True, text=True)
        log(f"Deleted user {username}")
    except subprocess.CalledProcessError as e:
        log(f"User deletion failed: {e.stderr}", logging.ERROR)
        raise
 
# === PAM Integration ===
def pam_sm_authenticate(pamh, flags, args):
    try:
        user = pamh.get_user()
        if not user:
            log("No username provided", logging.ERROR)
            return pamh.PAM_USER_UNKNOWN
 
        log(f"Authentication attempt for: {user}")
 
        if not local_user_exists(user):
            email = f"{user}@{CONFIG['EMAIL_DOMAIN']}"
            try:
                # Confirm user is in the correct group before creating local user
                if not ldap_check_group(email):
                    log(f"User {user} not in required group, aborting local user creation.", logging.WARNING)
                    return pamh.PAM_AUTH_ERR
         
                create_local_user(user)
                os._exit(1)
 
            except Exception as e:
                log(f"Local user setup failed: {e}", logging.ERROR)
                return pamh.PAM_AUTH_ERR
 
        # Display startup message
        #pamh.conversation(pamh.Message(pamh.PAM_TEXT_INFO, "Python PAM starting..."))
 
        # Prompt for password
        msg = pamh.Message(pamh.PAM_PROMPT_ECHO_OFF, "Enter password: ")
        resp = pamh.conversation(msg)
        password = handle_password(resp.resp)
 
        if not password:
            log("Empty password after prompt", logging.WARNING)
            return pamh.PAM_AUTH_ERR
 
        email = f"{user}@{CONFIG['EMAIL_DOMAIN']}"
        if not ldap_authenticate_credentials(email, password):
            return pamh.PAM_AUTH_ERR
 
        if not ldap_check_group(email):
            return pamh.PAM_AUTH_ERR
 
 
        return pamh.PAM_SUCCESS
 
    except Exception as e:
        log(f"Authentication error: {e}", logging.ERROR)
        return pamh.PAM_AUTH_ERR
 
def pam_sm_setcred(pamh, flags, args):
    return pamh.PAM_SUCCESS
 
def pam_sm_open_session(pamh, flags, args):
    return pamh.PAM_SUCCESS
 
def pam_sm_close_session(pamh, flags, args):
    return pamh.PAM_SUCCESS
 
def pam_sm_acct_mgmt(pamh, flags, args):
    return pamh.PAM_SUCCESS

I also wanted to give users sudo access, the user accounts are created with no passwords. but you can enable sudo for the group with no password in the following file:

/etc/sudoers.d/jumpbox

%JumpBox ALL=(ALL) NOPASSWD: ALL

I hope this has been helpful for someone.
It’s probably not best practice, and I’m sure theres a million different ways to do this, this is just how I solved the problem.

Sharing is caring!

Leave a Reply

Buy Me A Coffee