python操作ldap的库, 可以对ldap的数据进行增删改查,官方文档地址:https://www.python-ldap.org/en/latest/index.html
pip install python-ldap
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File : testldap.py
# @Author: xxx.xxxxx
# @Date : 2022/3/3 16:34
# @Desc :
import ldap
import ldap.modlist as modlist
LDAP_CONFIG = {
"HOST": "ldap://127.0.0.1:389",
"USERNAME": "cn=admin,dc=example,dc=org",
"PASSWORD": "admin",
"BASE_DN": "dc=example,dc=org"
}
class LDAPBackend(object):
"""
Authenticates with ldap.
"""
_connection = None
_connection_bound = False
def authenticate(self, username=None, passwd=None, **kwargs):
if not username or not passwd:
return None
if self._authenticate_user_dn(username, passwd):
# user = self._get_or_create_user(username, passwd)
# return user
return "pass"
else:
return None
@property
def connection(self):
if not self._connection_bound:
self._bind()
return self._get_connection()
def _bind(self):
self._bind_as(
LDAP_CONFIG['USERNAME'], LDAP_CONFIG['PASSWORD'], True
)
def _bind_as(self, bind_dn, bind_password, sticky=False):
self._get_connection().simple_bind_s(
bind_dn, bind_password
)
self._connection_bound = sticky
def _get_connection(self):
if not self._connection:
self._connection = ldap.initialize(LDAP_CONFIG['HOST'])
return self._connection
def _authenticate_user_dn(self, username, passwd):
bind_dn = 'cn=%s,%s' % (username, LDAP_CONFIG['BASE_DN'])
try:
self._bind_as(bind_dn, passwd, False)
return True
except ldap.INVALID_CREDENTIALS:
return False
def _get_or_create_user(self, username, passwd):
# 获取或者新建User
pass
# return user
def create_ldap_user(self, username, password, ou):
l = self.connection
user = {}
try:
user['objectclass'] = ["top", "person", "organizationalPerson", "inetOrgPerson"]
user['cn'] = username
user['sn'] = user['cn']
user['userPassword'] = password
user_dn = 'cn=%s,cn=%s,%s' % (username, ou, LDAP_CONFIG["BASE_DN"])
# 用户的属性值转化为bytes
user = {key: [v.encode("utf-8") if type(v) == str else v for v in values] if type(
values) == list else values.encode("utf-8") for key, values in user.items()}
ldif = modlist.addModlist(user)
ret = l.add_s(user_dn, ldif)
if ret:
print(f"dn={user_dn} is successfully created.")
return ret
except TypeError as e:
print(e)
except ldap.ALREADY_EXISTS as e:
print(f"dn={user_dn} is Already exists")
except Exception as e:
print(e)
def create_ldap_ou(self, ou):
"""
:param ou:
:return:
"""
l = self.connection
attrs = {}
attrs['objectclass'] = ["top", "organizationalUnit"]
attrs['ou'] = ou
ou_dn = 'ou=%s,%s' % (ou, LDAP_CONFIG["BASE_DN"])
# 字符换转化为字节
attrs = {key: [v.encode("utf-8") if type(v) == str else v for v in values] if type(
values) == list else values.encode("utf-8") for key, values in attrs.items()}
ldif = modlist.addModlist(attrs)
ret = l.add_s(ou_dn, ldif)
print(f"dn={ou_dn} is successfully created.")
return ret
def create_ldap_group(self, ou, gpname):
"""
:return:
"""
l = self.connection
attrs = {}
attrs['objectclass'] = ["top", "groupOfNames"]
attrs['cn'] = gpname
# 必须有member属性
attrs['member'] = "cn=luonan,cn=Users,dc=example,dc=org"
group_dn = 'cn=%s,ou=%s,%s' % (gpname, ou, LDAP_CONFIG["BASE_DN"])
# 字符换转化为字节
user = {key: [v.encode("utf-8") if type(v) == str else v for v in values] if type(
values) == list else values.encode("utf-8") for key, values in attrs.items()}
ldif = modlist.addModlist(user)
ret = l.add_s(group_dn, ldif)
print(f"group:{group_dn} successfully created!")
return ret
def get_ldap_users_dn(self, org_dn=''):
"""
:param org_dn:
:return: [dn]
"""
l = self.connection
filter_str = '(&(objectclass=person))'
usr_dn=[]
if org_dn:
ret = l.search_s(org_dn, ldap.SCOPE_SUBTREE, filter_str)
else:
ret = l.search_s(LDAP_CONFIG["BASE_DN"], ldap.SCOPE_SUBTREE, filter_str)
# 只获取用户的dn
for dn in ret:
# print(type(dn), dn)
usr_dn.append(dn[0])
return usr_dn
def add_users_to_group(self, ou, group_name):
"""
:param group_name: 组名,字符串类型如"groupname";
:return:
注函数中定义的参数modlist可以接受多个参数列表,其中:
MOD_ADD: 如果属性存在,这个属性可以有多个值,那么新值加进去,旧值保留
MOD_DELETE :如果属性的值存在,值将被删除
MOD_REPLACE :这个属性所有的旧值将会被删除,这个值被加进去
举例:
[( ldap.MOD_ADD, 'memberUid', 'user1' ),
( ldap.MOD_DELETE, 'memberUid', 'user3')]
"""
users = self.get_ldap_users_dn()
modlist = []
if users:
for us in users:
modlist.append((ldap.MOD_ADD, 'member', us.encode("utf-8")))
try:
l = self.connection
group_dn = "cn=%s,ou=%s,%s" % (group_name, ou, LDAP_CONFIG["BASE_DN"])
l.modify_s(group_dn, modlist)
l.unbind_s()
return True
except ldap.LDAPError as e:
print("%s update users failed,reason: %s" % (group_dn, str(e)))
return False
def add_user_to_group(self, ou, group_name):
"""
:param group_name: 组名,字符串类型如"groupname";
:return:
注函数中定义的参数modlist可以接受多个参数列表,其中:
MOD_ADD: 如果属性存在,这个属性可以有多个值,那么新值加进去,旧值保留
MOD_DELETE :如果属性的值存在,值将被删除
MOD_REPLACE :这个属性所有的旧值将会被删除,这个值被加进去
举例:
[( ldap.MOD_ADD, 'memberUid', 'user1' ),
( ldap.MOD_DELETE, 'memberUid', 'user3')]
"""
try:
l = self.connection
group_dn = "cn=%s,ou=%s,%s" % (group_name, ou, LDAP_CONFIG["BASE_DN"])
users = self.get_ldap_users_dn()
if users:
for us in users:
data = []
try:
data.append((ldap.MOD_ADD, 'member', us.encode("utf-8")))
l.modify_s(group_dn, data)
print(f"{group_dn} successfully added user:{us}")
except ldap.TYPE_OR_VALUE_EXISTS as e:
print("%s update users failed,reason: %s" % (group_dn, str(e)))
continue
l.unbind_s()
return True
except ldap.LDAPError as e:
print("%s update users failed,reason: %s" % (group_dn, str(e)))
return False
if __name__ == "__main__":
ld = LDAPBackend()
# 用户鉴权
# users = ld.authenticate('admin', 'admin')
# 创建用户
# ld.create_ldap_user(username="ln05", password="example123", ou="Users")
# 创建ou
# ld.create_ldap_ou(ou='MGroups')
# 在ou 下创建组
# ld.create_ldap_group(ou='MGroups', gpname="gp004")
# 查询ldap下的所有用户,默认查询全部
# users = ld.get_ldap_users_dn()
# for u in users:
# print(type(u), u)
# 给用户组批量添加用户
# ld.add_users_to_group(ou='MGroups', group_name='gp001')
# 给用户组逐个添加用户
ld.add_user_to_group(ou='MGroups', group_name='gp003')