confluent-kafka-dotnet is Confluent's .NET client for Apache Kafka and theConfluent Platform.
Features:
High performance - confluent-kafka-dotnet is a lightweight wrapper aroundlibrdkafka, a finely tuned Cclient.
Reliability - There are a lot of details to get right when writing an Apache Kafkaclient. We get them right in one place (librdkafka) and leverage this workacross all of our clients (also confluent-kafka-pythonand confluent-kafka-go).
Supported - Commercial support is offered byConfluent.
Future proof - Confluent, founded by thecreators of Kafka, is building a streaming platformwith Apache Kafka at its core. It's high priority for us that client features keeppace with core Apache Kafka and components of the Confluent Platform.
confluent-kafka-dotnet is derived from Andreas Heider's rdkafka-dotnet.We're fans of his work and were very happy to have been able to leverage rdkafka-dotnet as the basis of thisclient. Thanks Andreas!
confluent-kafka-dotnet is distributed via NuGet. We provide five packages:
To install Confluent.Kafka from within Visual Studio, search for Confluent.Kafka in the NuGet Package Manager UI, or run the following command in the Package Manager Console:
Install-Package Confluent.Kafka -Version 1.8.1
To add a reference to a dotnet core project, execute the following at the command line:
dotnet add package -v 1.8.1 Confluent.Kafka
Note: Confluent.Kafka
depends on the librdkafka.redist
package which provides a number of different builds of librdkafka
that are compatible with common platforms. If you are on one of these platforms this will all work seamlessly (and you don't need to explicitly reference librdkafka.redist
). If you are on a different platform, you may need to build librdkafka manually (or acquire it via other means) and load it using the Library.Load method.
Nuget packages corresponding to all commits to release branches are available from the following nuget package source (Note: this is not a web URL - youshould specify it in the nuget package manager):https://ci.appveyor.com/nuget/confluent-kafka-dotnet. The version suffix of these nuget packagesmatches the appveyor build number. You can see which commit a particular build number corresponds to by looking at theAppVeyor build history
Take a look in the examples directory for example usage. The integration tests also serve as good examples.
For an overview of configuration properties, refer to the librdkafka documentation.
You should use the ProduceAsync
method if you would like to wait for the result of your producerequests before proceeding. You might typically want to do this in highly concurrent scenarios,for example in the context of handling web requests. Behind the scenes, the client will manageoptimizing communication with the Kafka brokers for you, batching requests as appropriate.
using System;
using System.Threading.Tasks;
using Confluent.Kafka;
class Program
{
public static async Task Main(string[] args)
{
var config = new ProducerConfig { BootstrapServers = "localhost:9092" };
// If serializers are not specified, default serializers from
// `Confluent.Kafka.Serializers` will be automatically used where
// available. Note: by default strings are encoded as UTF8.
using (var p = new ProducerBuilder<Null, string>(config).Build())
{
try
{
var dr = await p.ProduceAsync("test-topic", new Message<Null, string> { Value="test" });
Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
}
}
}
Note that a server round-trip is slow (3ms at a minimum; actual latency depends on many factors).In highly concurrent scenarios you will achieve high overall throughput out of the producer usingthe above approach, but there will be a delay on each await
call. In stream processingapplications, where you would like to process many messages in rapid succession, you would typicallyuse the Produce
method instead:
using System;
using Confluent.Kafka;
class Program
{
public static void Main(string[] args)
{
var conf = new ProducerConfig { BootstrapServers = "localhost:9092" };
Action<DeliveryReport<Null, string>> handler = r =>
Console.WriteLine(!r.Error.IsError
? $"Delivered message to {r.TopicPartitionOffset}"
: $"Delivery Error: {r.Error.Reason}");
using (var p = new ProducerBuilder<Null, string>(conf).Build())
{
for (int i=0; i<100; ++i)
{
p.Produce("my-topic", new Message<Null, string> { Value = i.ToString() }, handler);
}
// wait for up to 10 seconds for any inflight messages to be delivered.
p.Flush(TimeSpan.FromSeconds(10));
}
}
}
using System;
using System.Threading;
using Confluent.Kafka;
class Program
{
public static void Main(string[] args)
{
var conf = new ConsumerConfig
{
GroupId = "test-consumer-group",
BootstrapServers = "localhost:9092",
// Note: The AutoOffsetReset property determines the start offset in the event
// there are not yet any committed offsets for the consumer group for the
// topic/partitions of interest. By default, offsets are committed
// automatically, so in this example, consumption will only start from the
// earliest message in the topic 'my-topic' the first time you run the program.
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe("my-topic");
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
c.Close();
}
}
}
}
The Web example demonstrates how to integrateApache Kafka with a web application, including how to implement IHostedService
to realize a long running consumer poll loop, how toregister a producer as a singleton service, and how to bind configuration from an injected IConfiguration
instance.
The .NET Client has full support for transactions and idempotent message production, allowing you to write horizontally scalable streamprocessing applications with exactly once semantics. The ExactlyOnce example demonstrates this capability by wayof an implementation of the classic "word count" problem, also demonstrating how to use the FASTERKey/Value store (similar to RocksDb) to materialize working state that may be larger than available memory, and incremental rebalancingto avoid stop-the-world rebalancing operations and unnecessary reloading of state when you add or remove processing nodes.
The three "Serdes" packages provide serializers and deserializers for Avro, Protobuf and JSON with Confluent Schema Registry integration. The Confluent.SchemaRegistry
nuget package provides a client for interfacing withSchema Registry's REST API.
Note: All three serialization formats are supported across Confluent Platform. They each make different tradeoffs, and you should use the one that best matches to your requirements. Avro is well suited to the streaming data use-case, but the quality and maturity of the non-Java implementations lags that of Java - this is an important consideration. Protobuf and JSON both have great support in .NET.
Errors delivered to a client's error handler should be considered informational except when the IsFatal
flagis set to true
, indicating that the client is in an un-recoverable state. Currently, this can only happen onthe producer, and only when enable.idempotence
has been set to true
. In all other scenarios, clients willattempt to recover from all errors automatically.
Although calling most methods on the clients will result in a fatal error if the client is in an un-recoverablestate, you should generally only need to explicitly check for fatal errors in your error handler, and handlethis scenario there.
When using Produce
, to determine whether a particular message has been successfully delivered to a cluster,check the Error
field of the DeliveryReport
during the delivery handler callback.
When using ProduceAsync
, any delivery result other than NoError
will cause the returned Task
to be in thefaulted state, with the Task.Exception
field set to a ProduceException
containing information about the messageand error via the DeliveryResult
and Error
fields. Note: if you await
the call, this means a ProduceException
will be thrown.
All Consume
errors will result in a ConsumeException
with further information about the error and contextavailable via the Error
and ConsumeResult
fields.
The Confluent Cloud example demonstrates how to configure the .NET client for use withConfluent Cloud.
Instructions on building and testing confluent-kafka-dotnet can be found here.
Copyright (c)2016-2019 Confluent Inc.2015-2016 Andreas Heider
KAFKA is a registered trademark of The Apache Software Foundation and has been licensed for useby confluent-kafka-dotnet. confluent-kafka-dotnet has no affiliation with and is not endorsed byThe Apache Software Foundation.
文件名大小更新时间 confluent-kafka-dotnet-master02018-12-04 confluent-kafka-dotnet-master\.github02018-12-04 confluent-kafka-dotnet-master\.github\ISSUE_TEMPLATE3812018-12-04 confluent-kafka-dotnet-master\.git
coufluent-kafka是Python模块,是对librdkafka的轻量级封装,librdkafka又是基于c/c++的kafka库,性能上不必多说。使用上要优于kafka-python。 confluent-kafka-python是Confluent用于Apache Kafka和 Confluent Platform的Python客户端。 特征: 高性能 - confluent-kaf
一、引言 说明一点,如果想调试这里的代码,必须引入Confluent.Kafka这个dll才可以,直接在Visual Studio 项目的 Nuget 里面可以查找,直接安装就可以了。 二、消息的生产者(Kafka消息的Producer) 大多数的消息中间件都包含三个部分,一个是消息的生产者,一个是存放消息的队列,另外一个就是消息的消费者,我们就按着这个顺序,我就先把消息生
.Net(c#)使用 Kafka 小结 1.开篇 由于项目中必须使用 kafka 来作为消息组件,所以使用 kafka 有一段时间了。不得不感叹 kafka 是一个相当优秀的消息系统。下面直接对使用过程做一总结,希望对大家有用。 1.1.kafka 部署 kafka 的简单搭建我们使用 docker 进行,方便快捷单节点。生产环境不推荐这样的单节点 kafka 部署。 1.1.1.确保安装了 do
消息中间件 --- Kafka 快速入门 消息中间件:https://blog.51cto.com/u_9291927/category33 GitHub: GitHub - scorpiostudio/HelloKafka: HelloKafka Kafka快速入门(一)--- Kafka简介:https://blog.51cto.com/9291927/2493953 Kafka快速入门(二)
定义消费者 using KafkaHelper.Config; using Microsoft.Extensions.Options; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Confluent.
一个基本的汇流-Kafka生产者和消费者已经被创建来发送明文消息。 服务器配置: 实现SSL是否需要其他配置? 另外,有人能总结一下caroot吗?据我所知,这是一个证书链。因此,如果只有一个证书,那么caroot.pem和cert.pem应该是相同的吗?这个文件可能是问题所在。证书和私钥是在PyOpenSSL中创建的。keystore.p12是使用KeyTool从.jks密钥库转换而来的。 谢谢
我正在为kafka设置一个架构注册表服务器。我已经使用了融合模式注册表,一切都很好,但是后来我看到,你可以用更少的麻烦设置一个默认的,Spring的。所以我做了,但我有点惊讶,它似乎更难控制模式,它在命名方面做了奇怪的事情(EntityCamelCase到entitycamelCase),没有像融合一样分离x值和x键。模式的版本会自动颠簸,我甚至不知道兼容模式是什么。 所以我想找到更多的东西,但是
我已经安装了Kafka在当地的Minikube通过使用Helm图表https://github.com/confluentinc/cp-helm-charts按照以下说明https://docs.confluent.io/current/installation/installing_cp/cp-helm-charts/docs/index.html如下: kafka_config.yaml 几乎
我从机器上读取数据,并将其作为JSON流式传输到一个kafka主题。我想阅读这个主题,并使用Confluent将streamdata存储到elasticsearch中。 我的步骤:1。创建KSQL流以从JSON转换为AVRO json流: avro流: 在此之后,我将得到以下avro流: 我的连接-Avro-Distributed。属性:
我找不到任何关于Apache Kafka Connect如何实际工作的简单解释。例如,在官方教程中,它说我们在bin目录下运行某些shell脚本,我们需要配置一些文件。 这很清楚。我猜我们需要克隆整个apache kafka源代码,因为这些shell文件在apache kafka源代码中使用了许多其他shell脚本和java类。 但是,我们将连接器代码放在哪里,以何种格式放置呢?作为源代码还是作为
我有一个 Confluent 接收器连接器,它正在从 Kafka 主题获取数据。然后,它会摄取到 S3 存储桶中。 摄取工作正常,一切都很好,但是现在我需要在将Avro数据放入存储桶之前对其进行压缩。 我已经尝试了以下配置 “avro.code”,我以为会压缩数据,但它没有。取而代之的是,我还尝试了“ ”s3.compression.type“: ”snappy“ ',仍然没有运气!但是,这确实适