<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-all</artifactId>
<version>2.8.1</version>
</dependency>
public class CreateNamespaces {
public static void main(String[] args) throws Exception {
// 1 创建Pulsar的Admin管理对象
String serviceUrl = "http://hadoop001:6650";
PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl(serviceUrl)
.build();
val tenantName = "my-tenant"
// ===============基于Pulsar的Admin对象进行相关的操作================
// 创建租户
String tenantName = "myTenant";
Set<String> allowedClusters = new HashSet<>();
allowedClusters.add("pulsar-cluster");
TenantInfo config = TenantInfo.builder().allowedClusters(allowedClusters).build();
admin.tenants().createTenant(tenantName,config);
// 查看当前有那些租户
String[] tenantNames = (String[]) admin.tenants().getTenants().toArray();
for (String name : tenantNames) {
System.out.println(name);
}
// 删除租户操作
admin.tenants().deleteTenant(tenantName);
// =========================关闭管理对象=============================
admin.close()
}
}
public class CreateNamespaces {
public static void main(String[] args) throws Exception {
// 1 创建Pulsar的Admin管理对象
String serviceUrl = "http://hadoop001:6650";
PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl(serviceUrl)
.build();
// 2 创建名称空间
admin.namespaces().createNamespace("my_tenant/my_namespace");
// 3 获取所有的名称空间
System.out.println("获取当前有那些名称空间:");
List<String> namespaces = admin.namespaces().getNamespaces("my_namespace");
for (String namespace : namespaces) {
System.out.println(namespace);
}
// 4 删除名称空间
admin.namespaces().deleteNamespace("itcast_pulsar_t/itcast_pulsar_n");
// 6 关闭资源
admin.close();
}
}
public class CreateTopic {
public static void main(String[] args) throws Exception {
// 1 创建Pulsar的Admin管理对象
String serviceUrl = "http://hadoop001:6650";
PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl(serviceUrl)
.build();
//2 创建Topic
//创建一个持久化的带分区的Topic的
// 2.1 创建一个持久化的带分区的Topic的
//admin.topics().createPartitionedTopic("persistent://my_tenant/my_namespace/my-topic1", 3);
// 2.2 创建一个非持久化的带分区的Topic的
//admin.topics().createPartitionedTopic("non-persistent://my_tenant/my_namespace/my-topic2", 3);
// 2.3 创建一个持久化的不带分区的Topic的
//admin.topics().createNonPartitionedTopic("persistent://my_tenant/my_namespace/my-topic3");
// 2.4 创建一个非持久化的不带分区的Topic的
admin.topics().createNonPartitionedTopic("non-persistent://my_tenant/my_namespace/my-topic4");
// 3 列出某个名称空间下, 所有的Topic
// 无分区的topic
List<String> topics = admin.topics().getList("my_tenant/my_namespace");
for (String topic : topics) {
System.out.println(topic);
}
// 有分区的topic
topics = admin.topics().getPartitionedTopicList("my_tenant/my_namespace");
for (String topic : topics) {
System.out.println(topic);
}
// 4 更新Topic: 增加分区数
admin.topics().updatePartitionedTopic("persistent://my_tenant/my_namespace/my-topic1", 5);
int partitions = admin.topics().getPartitionedTopicMetadata("persistent://my_tenant/my_namespace/my-topic1").partitions;
System.out.println("topic的分区数为:" + partitions);
// 5 删除Topic
// 删除没有分区的
admin.topics().delete("persistent://my_tenant/my_namespace/my-topic3");
// 删除有分区的
//admin.topics().deletePartitionedTopic("non-persistent://my_tenant/my_namespace/my-topic2");
// 6 关闭资源
admin.close();
}
}
public class PulsarProducerSyncTest {
public static void main(String[] args) throws Exception {
//1 获取pulsar的客户端对象
ClientBuilder clientBuilder = PulsarClient.builder();
clientBuilder.serviceUrl("http://hadoop001:6650");
PulsarClient client = clientBuilder.build();
//2. 通过客户端创建生产者的对象
Producer<byte[]> producer = client.newProducer()
.topic("persistent://my_tenant/my_namespace/my-topic3")
.create();
//3. 发送消息:
producer.send("Puslar".getBytes());
//4. 释放资源
producer.close();
client.close();
}
}
public class PulsarProducerAsyncTest {
public static void main(String[] args) throws Exception {
//1. 获取pulsar的客户端对象
ClientBuilder clientBuilder = PulsarClient.builder();
clientBuilder.serviceUrl("http://hadoop001:6650");
PulsarClient client = clientBuilder.build();
//2. 通过客户端创建生产者的对象
Producer<byte[]> producer = client.newProducer()
.topic("persistent://my_tenant/my_namespace/my-topic3")
.create();
//3. 发送消息:
producer.sendAsync("Pulsar".getBytes());
// 如果采用异步发送数据, 由于需要先放置在缓存区中, 如果立即关闭, 会导致无法发送
Thread.sleep(1000);
//4. 释放资源
producer.close();
client.close();
}
}
//模拟pulsar的消费者
public class PulsarConsumerTest {
public static void main(String[] args) throws Exception {
//1. 获取pulsar的客户端对象
ClientBuilder clientBuilder = PulsarClient.builder();
clientBuilder.serviceUrl("http://hadoop001:6650");
PulsarClient client = clientBuilder.build();
//2. 通过客户端创建消费者的对象
Consumer<byte[]> consumer = client.newConsumer()
.topic("persistent://my_tenant/my_namespace/my-topic3")
.subscriptionName("my-subscription") //订阅者名称,类似于消费者组
.subscribe();
//3. 循环获取读取
while(true) {
Message<byte[]> message = consumer.receive();
try {
System.out.println("消息为:" + new String(message.getData()));
consumer.acknowledge(message);
}catch ( Exception e) {
consumer.negativeAcknowledge(message);
}
}
}
}
public class PulsarConsumerBatchTest {
public static void main(String[] args) throws Exception {
//1. 获取pulsar的客户端对象
ClientBuilder clientBuilder = PulsarClient.builder();
clientBuilder.serviceUrl("http://hadoop001:6650");
PulsarClient client = clientBuilder.build();
//2. 通过客户端创建消费者的对象
Consumer<byte[]> consumer = client.newConsumer()
.topic("persistent://my_tenant/my_namespace/my-topic3")
.subscriptionName("my-subscription")
.batchReceivePolicy(BatchReceivePolicy.builder()
// 设置一次性最大获取多少条消息 默认值为 -1
.maxNumMessages(100)
// 设置每条数据允许的最大的字节大小 默认值为: 10 * 1024 * 1024
.maxNumBytes(1024 * 1024)
//设置等待的超时时间 默认值为 100
.timeout(200, TimeUnit.MILLISECONDS)
.build())
.subscribe();
//3. 循环获取读取
while(true) {
Messages<byte[]> messages = consumer.batchReceive(); // 批量读取数据
for (Message<byte[]> message : messages) {
try {
System.out.println("消息为:" + new String(message.getData()));
consumer.acknowledge(message);
}catch ( Exception e) {
consumer.negativeAcknowledge(message);
}
}
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink_log</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<flink.version>1.12.2</flink.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.13.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.13.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.13.3</version>
</dependency>
<!-- flink-pulsar依赖-->
<dependency>
<groupId>io.streamnative.connectors</groupId>
<artifactId>pulsar-flink-connector_2.11</artifactId>
<version>1.13.1.3</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
public class ConsumerLogToAnotherTopic {
public static void main(String[] args) throws Exception {
//1. 创建Flink环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();
env.enableCheckpointing(60*1000*5, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.setStateBackend(new MemoryStateBackend());
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(0, 3 * 1000));
//2. 定义pulsar相关参数
String serviceUrl = "pulsar://hadoop001:6650";
String adminUrl = "http://hadoop001:8080";
String topic = "persistent://public/default/my_topic_01";
//3. 创建pulsar source
Properties properties = new Properties();
properties.setProperty("topic", topic);
properties.setProperty("pulsar.producer.blockIfQueueFull", "true");
FlinkPulsarSource<String> pulsarSource = new FlinkPulsarSource<>(
serviceUrl,
adminUrl,
PulsarDeserializationSchema.valueOnly(new SimpleStringSchema()),
properties);
pulsarSource.setStartFromSubscription("my_subscription");
//4. 消费数据
DataStreamSource<String> pulsarDS = env.addSource(pulsarSource);
//5. 创建pulsar sink
String topic2 = "persistent://public/default/my_topic_02";
Properties props = new Properties();
props.setProperty("topic", topic2);
props.setProperty("partition.discovery.interval-millis", "50");
props.setProperty(PulsarOptions.FLUSH_ON_CHECKPOINT_OPTION_KEY, "false");
FlinkPulsarSink<String> stringFlinkPulsarSink = new FlinkPulsarSink<>(
serviceUrl,
adminUrl,
Optional.of(topic2),
props,
new PulsarSerializationSchemaWrapper.Builder<>(new SimpleStringSchema()).useAtomicMode(DataTypes.STRING()).build()
);
//6. 写入第二个Topic
pulsarDS.addSink(stringFlinkPulsarSink);
env.execute();
}
}
该参数对应于StreamAPI中的FlinkPulsarSource、FlinkPulsarSink构造参数中的Properties对象以及Table模式中的配置属性参数。
Parameter | Default value | Description | Effective range |
---|---|---|---|
topic | null | Pulsar topic | source |
topics | null | 多个 topics connected 以逗号连接 | source |
topicspattern | null | Multiple Pulsar topics with more Java regular matching | source |
partition.discovery.interval-millis | -1 | 自动发现添加或删除的主题,以毫秒为单位。如果设置为-1,表示关闭。 | source |
clientcachesize | 100 | 设置缓存的Pulsar客户端数量。 | source, sink |
auth-params | null | 设置Pulsar客户端认证参数。 | source, sink |
auth-plugin-classname | null | 设置Pulsar客户端的身份验证类名。 | source, sink |
flushoncheckpoint | true | Write a message to Pulsar topics. | sink |
failonwrite | false | 发生接收错误时,继续确认消息。 | sink |
polltimeoutms | 120000 | 设置等待获取下一条消息的超时时间,以毫秒为单位。 | source |
pulsar.reader.fail-on-data-loss | true | When data is lost, the operation fails. | source |
pulsar.reader.use-earliest-when-data-loss | false | When data is lost, use earliest reset offset. | source |
commitmaxretries | 3 | 当为pulsar消息设置偏移量时,设置重试的最大次数。 | source |
send-delay-millisecond | 0 | delay millisecond message, just use TableApi, StreamApiusePulsarSerializationSchema.setDeliverAtExtractor | Sink |
scan.startup.mode | null | Set the earliest, latest, and the position where subscribers consume news,. It is a required parameter. | source |
enable-key-hash-range | false | Enable the Key-Shared subscription mode. | source |
pulsar.reader.* | For details about Pulsar reader configurations, see Pulsar reader. | source | |
pulsar.reader.subscriptionRolePrefix | flink-pulsar- | When no subscriber is specified, the prefix of the subscriber name is automatically created. | source |
pulsar.reader.receiverQueueSize | 1000 | Set the receive queue size. | source |
pulsar.producer.* | For details about Pulsar producer configurations, see Pulsar producer. | Sink | |
pulsar.producer.sendTimeoutMs | 30000 | Set the timeout for sending a message, in unit of milliseconds. | Sink |
pulsar.producer.blockIfQueueFull | false | The Pulsar producer writes messages. When the queue is full, the method is blocked instead of an exception is thrown. | Sink |