我正在尝试使用 Kafka Connect REST API 来管理连接器,为简单起见,请考虑以下暂停
实现:
def pause(): Unit = {
logger.info(s"pause() Triggered")
val response = HttpClient.newHttpClient.send({
HttpRequest
.newBuilder(URI.create(config.connectUrl + s"/connectors/${config.connectorName}/pause"))
.PUT(BodyPublishers.noBody)
.timeout(Duration.ofMillis(config.timeout.toMillis.toInt))
.build()
}, BodyHandlers.ofString)
if (response.statusCode() != HTTPStatus.Accepted) {
throw new Exception(s"Could not pause connector: ${response.body}")
}
}
由于我使用< code>KafkaConnector作为资源,所以我不能使用Kafka Connect REST API,因为连接器操作符将KafkaConnector资源作为其唯一的事实来源,直接使用Kafka Connect REST API进行的手动更改(如< code>pause)会被集群操作符还原。
因此,要暂停连接器,我需要以某种方式编辑资源。
我正在努力改变当前函数的逻辑,如果能有一些实际的例子来说明如何处理KafkaConnor资源,那将是非常棒的。
我查看了Using Strimzi文档,但找不到任何实际示例
谢谢!
在@Jakub的帮助下,我设法创建了我的新客户端:
class KubernetesService(config: Configuration) extends StrictLogging {
private[this] val client = new DefaultKubernetesClient(Config.autoConfigure(config.connectorContext))
def setPause(pause: Boolean): Unit = {
logger.info(s"[KubernetesService] - setPause($pause) Triggered")
val connector = getConnector()
connector.getSpec.setPause(pause)
Crds.kafkaConnectorOperation(client).inNamespace(config.connectorNamespace).withName(config.connectorName).replace(connector)
Crds.kafkaConnectorOperation(client)
.inNamespace(config.connectorNamespace)
.withName(config.connectorName)
.waitUntilCondition(connector => {
connector != null &&
connector.getSpec.getPause == pause && {
val desiredState = if (pause) "Paused" else "Running"
connector.getStatus.getConditions.stream().anyMatch(_.getType.equalsIgnoreCase(desiredState))
}
}, config.timeout.toMillis, TimeUnit.MILLISECONDS)
}
def delete(): Unit = {
logger.info(s"[KubernetesService] - delete() Triggered")
Crds.kafkaConnectorOperation(client).inNamespace(config.connectorNamespace).withName(config.connectorName).delete
Crds.kafkaConnectorOperation(client)
.inNamespace(config.connectorNamespace)
.withName(config.connectorName)
.waitUntilCondition(_ == null, config.timeout.toMillis, TimeUnit.MILLISECONDS)
}
def create(oldKafkaConnect: KafkaConnector): Unit = {
logger.info(s"[KubernetesService] - create(${oldKafkaConnect.getMetadata}) Triggered")
Crds.kafkaConnectorOperation(client).inNamespace(config.connectorNamespace).withName(config.connectorName).create(oldKafkaConnect)
Crds.kafkaConnectorOperation(client)
.inNamespace(config.connectorNamespace)
.withName(config.connectorName)
.waitUntilCondition(connector => {
connector != null &&
connector.getStatus.getConditions.stream().anyMatch(_.getType.equalsIgnoreCase("Running"))
}, config.timeout.toMillis, TimeUnit.MILLISECONDS)
}
def getConnector(): KafkaConnector = {
logger.info(s"[KubernetesService] - getConnector() Triggered")
Try {
Crds.kafkaConnectorOperation(client).inNamespace(config.connectorNamespace).withName(config.connectorName).get
} match {
case Success(connector) => connector
case Failure(_: NullPointerException) => throw new NullPointerException(s"Failure on getConnector(${config.connectorName}) on ns: ${config.connectorNamespace}, context: ${config.connectorContext}")
case Failure(exception) => throw exception
}
}
}
要暂停连接器,您可以编辑KafkaConnector
资源,并在中设置
转换为pause
字段。spectrue
(参见文档)。有几种方法可供选择。您可以使用kubectl
并从文件中应用新的YAML(kubectl apply
)或使用kubectl edit
以交互方式执行。
如果您想通过编程来实现,您将需要使用Kubernetes客户端来编辑资源。在Java中,您还可以使用Strimzi的< code>api模块,它具有编辑资源的所有结构。我用Fabric8 Kubernetes客户端和< code>api模块编写了一个简单的例子来暂停Java中的Kafka连接器:
package cz.scholz.strimzi.api.examples;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.strimzi.api.kafka.Crds;
import io.strimzi.api.kafka.KafkaConnectorList;
import io.strimzi.api.kafka.model.KafkaConnector;
public class PauseConnector {
public static void main(String[] args) {
String namespace = "myproject";
String crName = "my-connector";
KubernetesClient client = new DefaultKubernetesClient();
MixedOperation<KafkaConnector, KafkaConnectorList, Resource<KafkaConnector>> op = Crds.kafkaConnectorOperation(client);
KafkaConnector connector = op.inNamespace(namespace).withName(crName).get();
connector.getSpec().setPause(true);
op.inNamespace(namespace).withName(crName).replace(connector);
client.close();
}
}
(完整项目见https://github.com/scholzj/strimzi-api-examples)
我不是Scala用户——但是我认为它也可以在Scala中使用,但是我把从Java重写到Scala的工作留给了你。