当前位置: 首页 > 工具软件 > Python-LDAP > 使用案例 >

python-ldap模块

杜海
2023-12-01

模块作用

python操作ldap的库, 可以对ldap的数据进行增删改查,官方文档地址:https://www.python-ldap.org/en/latest/index.html

模块安装

pip install python-ldap

代码示例

  • 不断完善中……
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File  : testldap.py
# @Author: xxx.xxxxx
# @Date  : 2022/3/3 16:34
# @Desc  :
import ldap
import ldap.modlist as modlist

LDAP_CONFIG = {
    "HOST": "ldap://127.0.0.1:389",
    "USERNAME": "cn=admin,dc=example,dc=org",
    "PASSWORD": "admin",
    "BASE_DN": "dc=example,dc=org"
}


class LDAPBackend(object):
    """
    Authenticates with ldap.
    """
    _connection = None
    _connection_bound = False

    def authenticate(self, username=None, passwd=None, **kwargs):
        if not username or not passwd:
            return None
        if self._authenticate_user_dn(username, passwd):
            # user = self._get_or_create_user(username, passwd)
            # return user
            return "pass"
        else:
            return None

    @property
    def connection(self):
        if not self._connection_bound:
            self._bind()
        return self._get_connection()

    def _bind(self):
        self._bind_as(
            LDAP_CONFIG['USERNAME'], LDAP_CONFIG['PASSWORD'], True
        )

    def _bind_as(self, bind_dn, bind_password, sticky=False):
        self._get_connection().simple_bind_s(
            bind_dn, bind_password
        )
        self._connection_bound = sticky

    def _get_connection(self):
        if not self._connection:
            self._connection = ldap.initialize(LDAP_CONFIG['HOST'])
        return self._connection

    def _authenticate_user_dn(self, username, passwd):
        bind_dn = 'cn=%s,%s' % (username, LDAP_CONFIG['BASE_DN'])
        try:
            self._bind_as(bind_dn, passwd, False)
            return True
        except ldap.INVALID_CREDENTIALS:
            return False

    def _get_or_create_user(self, username, passwd):
        # 获取或者新建User
        pass
        # return user

    def create_ldap_user(self, username, password, ou):
        l = self.connection
        user = {}
        try:
            user['objectclass'] = ["top", "person", "organizationalPerson", "inetOrgPerson"]
            user['cn'] = username
            user['sn'] = user['cn']
            user['userPassword'] = password
            user_dn = 'cn=%s,cn=%s,%s' % (username, ou, LDAP_CONFIG["BASE_DN"])
            # 用户的属性值转化为bytes
            user = {key: [v.encode("utf-8") if type(v) == str else v for v in values] if type(
                values) == list else values.encode("utf-8") for key, values in user.items()}

            ldif = modlist.addModlist(user)
            ret = l.add_s(user_dn, ldif)
            if ret:
                print(f"dn={user_dn} is successfully created.")
                return ret
        except TypeError as e:
            print(e)
        except ldap.ALREADY_EXISTS as e:
            print(f"dn={user_dn} is Already exists")
        except Exception as e:
            print(e)

    def create_ldap_ou(self, ou):
        """

        :param ou:
        :return:
        """
        l = self.connection
        attrs = {}
        attrs['objectclass'] = ["top", "organizationalUnit"]
        attrs['ou'] = ou
        ou_dn = 'ou=%s,%s' % (ou, LDAP_CONFIG["BASE_DN"])
        # 字符换转化为字节
        attrs = {key: [v.encode("utf-8") if type(v) == str else v for v in values] if type(
            values) == list else values.encode("utf-8") for key, values in attrs.items()}
        ldif = modlist.addModlist(attrs)
        ret = l.add_s(ou_dn, ldif)
        print(f"dn={ou_dn} is successfully created.")
        return ret

    def create_ldap_group(self, ou, gpname):
        """

        :return:
        """
        l = self.connection
        attrs = {}
        attrs['objectclass'] = ["top", "groupOfNames"]
        attrs['cn'] = gpname
        # 必须有member属性
        attrs['member'] = "cn=luonan,cn=Users,dc=example,dc=org"
        group_dn = 'cn=%s,ou=%s,%s' % (gpname, ou, LDAP_CONFIG["BASE_DN"])
        # 字符换转化为字节
        user = {key: [v.encode("utf-8") if type(v) == str else v for v in values] if type(
            values) == list else values.encode("utf-8") for key, values in attrs.items()}

        ldif = modlist.addModlist(user)
        ret = l.add_s(group_dn, ldif)
        print(f"group:{group_dn} successfully created!")
        return ret

    def get_ldap_users_dn(self, org_dn=''):
        """

        :param org_dn:
        :return: [dn]
        """
        l = self.connection
        filter_str = '(&(objectclass=person))'
        usr_dn=[]
        if org_dn:
            ret = l.search_s(org_dn, ldap.SCOPE_SUBTREE, filter_str)
        else:
            ret = l.search_s(LDAP_CONFIG["BASE_DN"], ldap.SCOPE_SUBTREE, filter_str)
        # 只获取用户的dn
        for dn in ret:
            # print(type(dn), dn)
            usr_dn.append(dn[0])
        return usr_dn

    def add_users_to_group(self, ou, group_name):
        """
              :param group_name: 组名,字符串类型如"groupname";
              :return:
              注函数中定义的参数modlist可以接受多个参数列表,其中:
              MOD_ADD: 如果属性存在,这个属性可以有多个值,那么新值加进去,旧值保留
              MOD_DELETE :如果属性的值存在,值将被删除
              MOD_REPLACE :这个属性所有的旧值将会被删除,这个值被加进去
              举例:
               [( ldap.MOD_ADD, 'memberUid', 'user1' ),
                  ( ldap.MOD_DELETE, 'memberUid', 'user3')]
              """
        users = self.get_ldap_users_dn()
        modlist = []
        if users:
            for us in users:
                modlist.append((ldap.MOD_ADD, 'member', us.encode("utf-8")))
        try:
            l = self.connection
            group_dn = "cn=%s,ou=%s,%s" % (group_name, ou, LDAP_CONFIG["BASE_DN"])
            l.modify_s(group_dn, modlist)
            l.unbind_s()
            return True
        except ldap.LDAPError as e:
            print("%s update users failed,reason: %s" % (group_dn, str(e)))
        return False

    def add_user_to_group(self, ou, group_name):
        """
              :param group_name: 组名,字符串类型如"groupname";
              :return:
              注函数中定义的参数modlist可以接受多个参数列表,其中:
              MOD_ADD: 如果属性存在,这个属性可以有多个值,那么新值加进去,旧值保留
              MOD_DELETE :如果属性的值存在,值将被删除
              MOD_REPLACE :这个属性所有的旧值将会被删除,这个值被加进去
              举例:
               [( ldap.MOD_ADD, 'memberUid', 'user1' ),
                  ( ldap.MOD_DELETE, 'memberUid', 'user3')]
              """
        try:
            l = self.connection
            group_dn = "cn=%s,ou=%s,%s" % (group_name, ou, LDAP_CONFIG["BASE_DN"])
            users = self.get_ldap_users_dn()
            if users:
                for us in users:
                    data = []
                    try:
                        data.append((ldap.MOD_ADD, 'member', us.encode("utf-8")))
                        l.modify_s(group_dn, data)
                        print(f"{group_dn} successfully added user:{us}")
                    except ldap.TYPE_OR_VALUE_EXISTS as e:
                        print("%s update users failed,reason: %s" % (group_dn, str(e)))
                        continue
            l.unbind_s()
            return True
        except ldap.LDAPError as e:
            print("%s update users failed,reason: %s" % (group_dn, str(e)))
        return False


if __name__ == "__main__":
    ld = LDAPBackend()
    # 用户鉴权
    # users = ld.authenticate('admin', 'admin')

    # 创建用户
    # ld.create_ldap_user(username="ln05", password="example123", ou="Users")

    # 创建ou
    # ld.create_ldap_ou(ou='MGroups')

    # 在ou 下创建组
    # ld.create_ldap_group(ou='MGroups', gpname="gp004")

    # 查询ldap下的所有用户,默认查询全部
    # users = ld.get_ldap_users_dn()
    # for u in users:
    #     print(type(u), u)

    # 给用户组批量添加用户
    # ld.add_users_to_group(ou='MGroups',  group_name='gp001')

    # 给用户组逐个添加用户
    ld.add_user_to_group(ou='MGroups',  group_name='gp003')

参考文档

  1. https://www.cnblogs.com/linxiyue/p/10250243.html
  2. https://codingdict.com/sources/py/ldap/18943.html #开源实例
  3. https://blog.51cto.com/u_14207158/2352634 #AD/LDAP
  4. https://www.cnblogs.com/huiyichanmian/p/12600710.html #用户认证
  5. https://my.oschina.net/shawnplaying/blog/733338 #LDAP3
 类似资料: