我已经设置了Kafka和zookeeper认证与SASL ACL和Kafka生产者和消费者的SSL双向认证,包括加密。
通过启用Kafka和zookeeper之间的SASL和ACL,它不允许未经授权的Kafka代理登录到zookeeper集群。但是,主题的创建和删除可以不受任何限制。
动物园管理员. properties
dataDir=/x02/lsesv2-s/data/Zookeeper
clientPort=15300
tickTime=2000
initLimit=10
syncLimit=5
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
quorum.auth.enableSasl=true
quorum.auth.learnerRequireSasl=true
quorum.auth.serverRequireSasl=true
quorum.auth.learner.loginContext=QuorumLearner
quorum.auth.server.loginContext=QuorumServer
server.1=172.25.33.12:15302:15301
server.2=172.25.33.13:15302:15301
server.3=172.25.33.11:15302:15301
zookeeper_jaas.conf
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="abc123"
user_admin="abc123";
};
QuorumServer {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_admin="abc123";
};
QuorumLearner {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="abc123";
};
通过以下代码设置ACL
final CountDownLatch connectedSignal = new CountDownLatch(1);
String connect = "localhost:15300";
ZooKeeper zooKeeper = null;
try
{
String userName = "admin";
String password = "mit123";
zooKeeper = new ZooKeeper(connect, 5000, we ->
{
if (we.getState() == Watcher.Event.KeeperState.SyncConnected)
{
connectedSignal.countDown();
}
});
connectedSignal.await();
zooKeeper.addAuthInfo("digest", (userName + ":" + password).getBytes());
final String aclString = "auth:" + userName + ":" + password + ":" + "cdrwa" +
",sasl:" + userName + ":" + "cdrwa";
zooKeeper.setACL("/", parseACLs(aclString), -1);
} finally
{
if (zooKeeper != null)
{
zooKeeper.close();
}
}
上面的代码正在工作,下面是执行代码后的结果。
Welcome to ZooKeeper!
JLine support is disabled
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
getAcl /
'sasl,'admin
: cdrwa
'digest,'admin:oiasY+rmnmmK9mec8kpnvv281HE=
: cdrwa
而不是服务器。属性文件我在启动时覆盖了Kafka属性*
Kafka财产
kafka/bin/kafka-server-start.sh /x02/lsesv2-s/current/kafka/config/server.properties
--override broker.id=1
--override zookeeper.connect=10g-flton-onl01:15300,10g-flton-onl02:15300,10g-flton-nor02:15300
--override num.network.threads=16
--override num.io.threads=16
--override socket.send.buffer.bytes=10240000
--override socket.receive.buffer.bytes=10240000
--override log.dirs=/x02/lsesv2-s/data/Kafka
--override offsets.topic.replication.factor=1
--override min.insync.replicas=1
--override inter.broker.listener.name=INTERNAL
--override listeners=INTERNAL://10g-flton-onl01:15307
--override advertised.html" target="_blank">listeners=INTERNAL://10g-flton-onl01:15307
--override listener.security.protocol.map=INTERNAL:SSL
--override security.protocol=SSL
--override ssl.client.auth=required
--override ssl.key.password=abc123
--override ssl.keystore.location=configs/MHV/kafka.server.keystore.jks
--override ssl.keystore.password=abc123
--override ssl.truststore.location=configs/MHV/kafka.server.truststore.jks
--override ssl.truststore.password=abc123
--override ssl.endpoint.identification.algorithm=
Kafka对生产者/消费者的认证工作正常,动物园管理员对Kafka的认证工作也正常。但是,未经授权的用户也可以创建和删除主题。
主题创建
kafka/bin/kafka-topics.sh --create --zookeeper localhost:15300 --replication-factor 3 --partitions 8 --topic test
话题删除
kafka/bin/kafka-topics.sh --zookeeper localhost:15300 --delete --topic test
注意:我在创建或删除主题时没有设置-DJ ava . security . auth . log in . config = Kafka _ server _ JAAS . conf。所以这个操作要限制。但实际上,并没有。
只为授权用户帮助我创建和删除主题。
这似乎是本地测试所需的属性。
KAFKA_ZOOKEEPER_SET_ACL: "true"
对于汇合的图像或地图也是如此。
zookeeper.set.acl
参考
也如Kafka101汇合所述
存储在ZooKeeper中的元数据是这样的,只有代理才能修改相应的znode,但znode是世界可读的。
因为我们将ZooKeeper配置为需要SASL身份验证,所以需要设置java.security.auth.login。启动kafka主题工具时配置系统属性:
这里显示了一个代码示例和docker-compose文件
本文向大家介绍mysql5.7创建用户授权删除用户撤销授权,包括了mysql5.7创建用户授权删除用户撤销授权的使用技巧和注意事项,需要的朋友参考一下 一, 创建用户: 命令: 说明:username - 你将创建的用户名, host - 指定该用户在哪个主机上可以登陆,如果是本地用户可用localhost, 如果想让该用户可以从任意远程主机登陆,可以使用通配符%. p
连接MySQL操作 mysql -h 主机地址 -u 用户名 -p 用户密码 注:-u与root可以不用加空格,其它参数也一样。 DEMO 打开电脑CMD,输入 mysql -h 127.0.0.1 -u root -p 回车,然后输入密码。就可以连接到本地的MySQL数据库。 创建MySQL用户 CREATE USER 'username'@'host' IDENTIFIED BY 'passw
我有几个Kafka的题目作为测试。现在我想通过清理我的Kafka主题列表来把它们全部除掉。我设置了变量,然后停止并重新启动zookeeper和kafka服务器。但什么也帮不了我。主题仍然存在,“标记为删除”。我读了这个问题,但没有找到任何答案。否则,这里建议手动移除任何主题。但我该怎么做呢?在故事的结尾,手动或通过命令行,我如何永久删除Kafka主题?
我有这样的回应,当谈到检查用户未经授权。 有没有可能从未经授权的响应中删除路径?因为它没有为用户提供有价值的信息 }
主题的最后状态: 组,主题,分区,当前偏移量,日志结束偏移量,滞后,所有者G.AB_KAFF,T.AB_KAFF,0,5,5,0,Consumer-2_/127.0.0.1 G.AB_KAFF,T.AB_KAFF,1,5,5,0,Consumer-2_/127.0.0.1 现在我删除主题。并将邮件发布到此主题 那么为什么Kafka要把滞后设定为-ve数。这难道不是一个潜在的问题吗? 假设我再次订阅
需要创建一个新的用户组,允许在hybris中删除客户。 我们需要一个有权删除客户的用户组