Lenses offers SQL (for data browsing and Kafka Streams), Kafka Connect connector management, cluster monitoring and more.
You can find more on lenses.io
A collection of components to build a real time ingestion pipeline.
The following connectors will be deprecated and no longer included in any release from 3.0
Please take a moment and read the documentation and make sure the software prerequisites are met!!
Connector | Type | Description | Docs |
---|---|---|---|
AWS S3 | Sink | Copy data from Kafka to AWS S3. | Docs |
AzureDocumentDb | Sink | Copy data from Kafka and Azure Document Db. | Docs |
Bloomberg | Source | Copy data from Bloomberg streams and Kafka. | Docs |
Cassandra | Source | Copy data from Cassandra and Kafka. | Docs |
*Cassandra | Sink | Certified DSE Cassandra, copy data from Kafka to Cassandra. | Docs |
Coap | Source | Copy data from IoT Coap endpoints (using Californium) to Kafka. | Docs |
Coap | Sink | Copy data from Kafka to IoT Coap endpoints. | Docs |
Elastic 6 | Sink | Copy data from Kafka to Elastic Search 6.x w. tcp or http | Docs |
FTP/HTTP | Source | Copy data from FTP/HTTP to Kafka. | Docs |
Hazelcast | Sink | Copy data from Kafka to Hazelcast. | Docs |
HBase | Sink | Copy data from Kafka to HBase. | Docs |
Hive | Source | Copy data from Hive/HDFS to Kafka. | Docs |
Hive | Sink | Copy data from Kafka to Hive/HDFS | Docs |
InfluxDb | Sink | Copy data from Kafka to InfluxDb. | Docs |
Kudu | Sink | Copy data from Kafka to Kudu. | Docs |
JMS | Source | Copy data from JMS topics/queues to Kafka. | Docs |
JMS | Sink | Copy data from Kafka to JMS. | Docs |
MongoDB | Sink | Copy data from Kafka to MongoDB. | Docs |
MQTT | Source | Copy data from MQTT to Kafka. | Docs |
MQTT | Sink | Copy data from Kafka to MQTT. | Docs |
Pulsar | Source | Copy data from Pulsar to Kafka. | Docs |
Pulsar | Sink | Copy data from Kafka to Pulsar. | Docs |
Redis | Sink | Copy data from Kafka to Redis. | Docs |
ReThinkDB | Source | Copy data from RethinkDb to Kafka. | Docs |
ReThinkDB | Sink | Copy data from Kafka to RethinkDb. | Docs |
VoltDB | Sink | Copy data from Kafka to Voltdb. | Docs |
2.1.3
Move to connect-common 2.0.5 that adds complex type support to KCQL
2.1.2
2.1.0
2.0.1
Hive Source
connect.hive.hive.metastore
to connect.hive.metastore
connect.hive.hive.metastore.uris
to connect.hive.metastore.uris
Fix Elastic start up NPE
Fix to correct batch size extraction from KCQL on Pulsar
2.0.0
Deprecated:* Druid Sink (not scala 2.12 compatible)* Elastic Sink (not scala 2.12 compatible)* Elastic5 Sink(not scala 2.12 compatible)* RabbitMQ (not support and JMS connector can be used)
Redis
Cassandra
ReThinkDB
FTP Source
MQTT Source
1.2.7
Features
MQTT Source
Support dynamic topic names in Kafka from a wildcard subscription.
Example: INSERT INTO $
SELECT * FROM /mqttSourceTopic/+/test
If the MQTT topic is /mqttSourceTopic/A/test this Will result in topics in kafkamqttSourceTopic_A_test
Cassandra (source)
Support for sending JSON formatted message (with string key) to kafka topic.
Sample KCQL would be like:
INSERT INTO <topic> SELECT <fields> FROM <column_family> PK <PK_field> WITHFORMAT JSON WITHUNWRAP INCREMENTALMODE=<mode> WITHKEY(<key_field>)
This would send field's values as JSON object to the said topic.
Note that in kafka connect properties one needs to set key.converter
and value.converter
as org.apache.kafka.connect.storage.StringConverter
Added a new INCREMENTALMODE called dsesearchtimestamp that will make a DSE Search queries using Solr instead of a native Cassandra query.
Instead of the native query:
SELECT a, b, c, d FROM keyspace.table WHERE pkCol > ? AND pkCol <= ? ALLOW FILTERING;We will have now the query with Solr on the dsesearchtimestamp INCREMENTALMODE:
SELECT a, b, c, d FROM keyspace.table WHERE solr_query=?;Where the solr_query will be something like this:
pkCol:{2020-03-23T15:02:21Z TO 2020-03-23T15:30:12.989Z]
AzureDocumentDB
Bug fixes
JMS Source
Allow for tasks parallelization and how the connector tasks parallelization is decided.
Changes:
tasks.max
value provided if the user connect.jms.scale.type
. Available values are kcql
and default
.If KCQL
is provided it will be based on the number of KCQL statements written, otherwise it will be driven based on the connector tasks.max
Kudu Sink
Handle null decimal types correctly
Mongo Sink
Handle decimal types
1.2.4Bug fixes
JMS Source
Ack the JMS messages was not always possible. Also there was an issue with producing the messages to Kafka out of order from the JMS queue.Changes:
1.2.3Features
Bug fixes
1.2.2Features
Bug fixes
1.2.1
1.2.0
1.1.0
connect.mongodb.batch.size
is deprecatedconnect.mapping.collection.to.json
to treat maps, list, sets as json when inserting into Cassandraconnect.rethink.batch.size
is deprecatedINSERT INTO targetTopic SELECT * FROM mqttTopic ... WITHREGEX=`$THE_REGEX`
connect.elastic.retry.interval
to elastic5 and elastic6DEFAULT UNSET
to be added on insert. Omitted columns from maps default to null.Alternatively, if set UNSET
, pre-existing value will be preservedconnect.cassandra.batch.size
is deprecated .1.0.0
0.4.0
ftp.protocol
introduced, either ftp (default) or ftps.0.3.0
0.2.6
connect.progress.enabled
which will periodically report log messages processedconnect.documentdb.db
to connect.documentdb.db
connect.documentdb.database.create
to connect.documentdb.db.create
connect.cassandra.source.kcql
to connect.cassandra.kcql
connect.cassandra.source.timestamp.type
to connect.cassandra.timestamp.type
connect.cassandra.source.import.poll.interval
to connect.cassandra.import.poll.interval
connect.cassandra.source.error.policy
to connect.cassandra.error.policy
connect.cassandra.source.max.retries
to connect.cassandra.max.retries
connect.cassandra.source.retry.interval
to connect.cassandra.retry.interval
connect.cassandra.sink.kcql
to connect.cassandra.kcql
connect.cassandra.sink.error.policy
to connect.cassandra.error.policy
connect.cassandra.sink.max.retries
to connect.cassandra.max.retries
connect.cassandra.sink.retry.interval
to connect.cassandra.retry.interval
connect.coap.bind.port
to connect.coap.port
connect.coap.bind.port
to connect.coap.port
connect.coap.bind.host
to connect.coap.host
connect.coap.bind.host
to connect.coap.host
connect.mongo.database
to connect.mongo.db
connect.mongo.sink.batch.size
to connect.mongo.batch.size
connect.druid.sink.kcql
to connect.druid.kcql
connect.druid.sink.conf.file
to connect.druid.kcql
connect.druid.sink.write.timeout
to connect.druid.write.timeout
connect.elastic.sink.kcql
to connect.elastic.kcql
connect.hbase.sink.column.family
to connect.hbase.column.family
connect.hbase.sink.kcql
to connect.hbase.kcql
connect.hbase.sink.error.policy
to connect.hbase.error.policy
connect.hbase.sink.max.retries
to connect.hbase.max.retries
connect.hbase.sink.retry.interval
to connect.hbase.retry.interval
connect.influx.sink.kcql
to connect.influx.kcql
connect.influx.connection.user
to connect.influx.username
connect.influx.connection.password
to connect.influx.password
connect.influx.connection.database
to connect.influx.db
connect.influx.connection.url
to connect.influx.url
connect.kudu.sink.kcql
to connect.kudu.kcql
connect.kudu.sink.error.policy
to connect.kudu.error.policy
connect.kudu.sink.retry.interval
to connect.kudu.retry.interval
connect.kudu.sink.max.retries
to connect.kudu.max.reties
connect.kudu.sink.schema.registry.url
to connect.kudu.schema.registry.url
connect.redis.connection.password
to connect.redis.password
connect.redis.sink.kcql
to connect.redis.kcql
connect.redis.connection.host
to connect.redis.host
connect.redis.connection.port
to connect.redis.port
connect.rethink.source.host
to connect.rethink.host
connect.rethink.source.port
to connect.rethink.port
connect.rethink.source.db
to connect.rethink.db
connect.rethink.source.kcql
to connect.rethink.kcql
connect.rethink.sink.host
to connect.rethink.host
connect.rethink.sink.port
to connect.rethink.port
connect.rethink.sink.db
to connect.rethink.db
connect.rethink.sink.kcql
to connect.rethink.kcql
connect.jms.user
to connect.jms.username
connect.jms.source.converters
to connect.jms.converters
connect.jms.converters
and replace my kcql withConverters
connect.jms.queues
and replace my kcql withType QUEUE
connect.jms.topics
and replace my kcql withType TOPIC
connect.mqtt.source.kcql
to connect.mqtt.kcql
connect.mqtt.user
to connect.mqtt.username
connect.mqtt.hosts
to connect.mqtt.connection.hosts
connect.mqtt.converters
and replace my kcql withConverters
connect.mqtt.queues
and replace my kcql withType=QUEUE
connect.mqtt.topics
and replace my kcql withType=TOPIC
connect.hazelcast.sink.kcql
to connect.hazelcast.kcql
connect.hazelcast.sink.group.name
to connect.hazelcast.group.name
connect.hazelcast.sink.group.password
to connect.hazelcast.group.password
connect.hazelcast.sink.cluster.members
tp connect.hazelcast.cluster.members
connect.hazelcast.sink.batch.size
to connect.hazelcast.batch.size
connect.hazelcast.sink.error.policy
to connect.hazelcast.error.policy
connect.hazelcast.sink.max.retries
to connect.hazelcast.max.retries
connect.hazelcast.sink.retry.interval
to connect.hazelcast.retry.interval
connect.volt.sink.kcql
to connect.volt.kcql
connect.volt.sink.connection.servers
to connect.volt.servers
connect.volt.sink.connection.user
to connect.volt.username
connect.volt.sink.connection.password
to connect.volt.password
connect.volt.sink.error.policy
to connect.volt.error.policy
connect.volt.sink.max.retries
to connect.volt.max.retries
connect.volt.sink.retry.interval
to connect.volt.retry.interval
0.2.5 (8 Apr 2017)
withunwrap
timestamp
in the Cassandra Source for timestamp tracking.0.2.4 (26 Jan 2017)
SELECT * FROM influx-topic WITHTIMESTAMP sys_time() WITHTAG(field1, CONSTANT_KEY1=CONSTANT_VALUE1, field2,CONSTANT_KEY2=CONSTANT_VALUE1)
ALL
. Use connect.influx.consistency.level
to set it to ONE/QUORUM/ALL/ANYconnect.influx.sink.route.query
was renamed to connect.influx.sink.kcql
0.2.3 (5 Jan 2017)
Struct
, Schema.STRING
and Json
with schema in the Cassandra, ReThinkDB, InfluxDB and MongoDB sinks.export.query.route
to sink.kcql
.import.query.route
to source.kcql
.STOREAS
so specify target sink types, e.g. Redis Sorted Sets, Hazelcast map, queues, ringbuffers.Requires gradle 6.0 to build.
To build
gradle compile
To test
gradle test
To create a fat jar
gradle shadowJar
You can also use the gradle wrapper
./gradlew shadowJar
To view dependency trees
gradle dependencies # or
gradle :kafka-connect-cassandra:dependencies
To build a particular project
gradle :kafka-connect-elastic5:build
To create a jar of a particular project:
gradle :kafka-connect-elastic5:shadowJar
We'd love to accept your contributions! Please use GitHub pull requests: fork the repo, develop and test your code,semantically commit and submit a pull request. Thanks!
1、响应式编程的出现 我们平时写的代码大多数都是串行的,也就是传统编程,就会带来了以下的问题: 阻塞导致性能瓶颈和浪费资源 增加线程可能会引起资源竞争和并发问题(这是解决其中的一种办法) 引入了异步编程,然后也会带来了一下问题: Callbacks 是解决非阻塞的方案,但是随着业务代码太多了,回调也会很多,他们之间很难组合,最终导致网中流行的“Callback Hell”,翻译为回调地狱 Futu
spring-cloud-stream-reactive是spring-cloud-stream发布订阅消息驱动的响应式编程组件。提供异步非阻塞消息发布订阅。 下面是一大坨说明=-= 通过output输出管道 和 input输入管道来发布和订阅消息,spring-cloud-stream的binder负责将管道适配到指定的消息中间件,并负责与消息中间件交互(相当于在管道和消息中间件之间加
响应式编程 Wiki的定义 Reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arra
Stream, RxJava, Reactor之比较 如果你熟悉Java 8,同时又了解反应式编程(Reactive Programming)框架,例如RxJava和Reactor等,你可能会问: “如果我可以用Java 8 的Stream, CompletableFuture, 以及Optional完成同样的事情,为什么还要用RxJava 或者 Reactor呢?” 原因是:大多数时候你在处
前言 反应式编程是一种可以替代命令式编程的编程范式。这种可替代性存在的原因在于反应式编程解决了命令式编程中的一些限制。理解这些限制,有助于你更好地理解反应式编程模型的优点 反应式流规范 •对比 Java 中的流 Java的流和反应式流Java的流和反应式流之间有很多相似之处。首先,它们的名字中都有流(Stream)这个词。 它们还提供了用于处理数据的函数式API。事实上,正如你稍后将会在介绍Rea
Stream + Lambda 用来处理、转换数据确实非常合适。 字符串转数字数组 import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; public class Test { public static void main(String[] args) {
Reactive Programming: 一种新的编程范式、编程思想; Reactive Streams: 是Reactive Programming这种编程范式的Java规范,定义了具体相关接口; Reactor: 一种基于Reactive Streams规范的库; RxJava: 另一种Reactive Streams规范库;出现在Reactive Stream规范之前; RxJav
Stream-2-Stream 是一个用 Java 语言实现的 Multicast+ 下一代流媒体传输协议。与传统的流媒体技术相比较,Multicast+ 具有更高效的传输效率和更少的带宽占用。 主要特点: Integrated MP3, Ogg media player. No external media player needed to listen!!! Easy to use GUI B
stream 模块提供了基本的网络请求能力,例如 GET 请求、POST 请求等,用于在组件的生命周期内与服务端进行交互。 fetch 发起一个请求。 fetch(options, callback, progressCallback) @options, 请求的配置选项,支持以下配置 method, string, HTTP 请求方法,值为 GET/POST/PUT/DELETE/PATCH/H
此功能用于检查文件的状态。 有时需要在使用文件之前检查文件的状态。 如果文件已损坏或不可用,则无法对该文件执行进一步操作。 因此,首先检查文件的状态更有意义。 语法 (Syntax) stream(filename) 参数 (Parameters) filename - 这是文件的名称。 返回值 (Return Value) 此函数可以返回以下任何值 - READY - 文件已准备就绪,可用于
stream是Node.js提供的又一个仅在服务区端可用的模块,目的是支持“流”这种数据结构。 什么是流?流是一种抽象的数据结构。想象水流,当在水管中流动时,就可以从某个地方(例如自来水厂)源源不断地到达另一个地方(比如你家的洗手池)。我们也可以把数据看成是数据流,比如你敲键盘的时候,就可以把每个字符依次连起来,看成字符流。这个流是从键盘输入到应用程序,实际上它还对应着一个名字:标准输入流(std
主要内容:从流中读取数据,写入流,管道流,链式流Stream 是一个抽象接口,Node 中有很多对象实现了这个接口。例如,对http 服务器发起请求的request 对象就是一个 Stream,还有stdout(标准输出)。 Node.js,Stream 有四种流类型: Readable - 可读操作。 Writable - 可写操作。 Duplex - 可读可写操作. Transform - 操作被写入数据,然后读出结果。 所有的 Strea
稳定性: 2 - 稳定的 流(stream)在 Node.js 中是处理流数据的抽象接口(abstract interface)。 stream 模块提供了基础的 API 。使用这些 API 可以很容易地来构建实现流接口的对象。 Node.js 提供了多种流对象。 例如, HTTP 请求 和 process.stdout 就都是流的实例。 流可以是可读的、可写的,或是可读写的。所有的流都是 Eve