Java Pulsar Clinet 和 Flink Pulsar Connector 示例

徐经武
2023-12-01

Pulsar Java Client

Maven依赖

 <dependency>
     <groupId>org.apache.pulsar</groupId>
     <artifactId>pulsar-client-all</artifactId>
     <version>2.8.1</version>
 </dependency>

租户操作

public class CreateNamespaces {
    public static void main(String[] args) throws Exception {
        // 1 创建Pulsar的Admin管理对象
         String serviceUrl = "http://hadoop001:6650";
        PulsarAdmin admin = PulsarAdmin.builder()
                .serviceHttpUrl(serviceUrl)
                .build();

        val tenantName = "my-tenant"
        // ===============基于Pulsar的Admin对象进行相关的操作================
        // 创建租户
        String tenantName = "myTenant";
        Set<String> allowedClusters = new HashSet<>();
        allowedClusters.add("pulsar-cluster");
        TenantInfo config = TenantInfo.builder().allowedClusters(allowedClusters).build();
        admin.tenants().createTenant(tenantName,config);
        
        // 查看当前有那些租户
        String[] tenantNames = (String[]) admin.tenants().getTenants().toArray();
        for (String name : tenantNames) {
            System.out.println(name);
        }

        // 删除租户操作
        admin.tenants().deleteTenant(tenantName);

        // =========================关闭管理对象=============================
        admin.close()

    }
}

名称空间操作

public class CreateNamespaces {
    public static void main(String[] args) throws Exception {
        // 1 创建Pulsar的Admin管理对象
        String serviceUrl = "http://hadoop001:6650";
        PulsarAdmin admin = PulsarAdmin.builder()
                .serviceHttpUrl(serviceUrl)
                .build();
        // 2 创建名称空间
        admin.namespaces().createNamespace("my_tenant/my_namespace");
        // 3 获取所有的名称空间
        System.out.println("获取当前有那些名称空间:");
        List<String> namespaces = admin.namespaces().getNamespaces("my_namespace");
        for (String namespace : namespaces) {
            System.out.println(namespace);
        }
        // 4 删除名称空间
        admin.namespaces().deleteNamespace("itcast_pulsar_t/itcast_pulsar_n");
        // 6 关闭资源
        admin.close();
    }
}

Topic操作

public class CreateTopic {
    public static void main(String[] args) throws Exception {
        // 1 创建Pulsar的Admin管理对象
        String serviceUrl = "http://hadoop001:6650";
        PulsarAdmin admin = PulsarAdmin.builder()
                .serviceHttpUrl(serviceUrl)
                .build();
        
        //2 创建Topic
        //创建一个持久化的带分区的Topic的
        // 2.1 创建一个持久化的带分区的Topic的
        //admin.topics().createPartitionedTopic("persistent://my_tenant/my_namespace/my-topic1", 3);
        // 2.2 创建一个非持久化的带分区的Topic的
        //admin.topics().createPartitionedTopic("non-persistent://my_tenant/my_namespace/my-topic2", 3);
        // 2.3 创建一个持久化的不带分区的Topic的
        //admin.topics().createNonPartitionedTopic("persistent://my_tenant/my_namespace/my-topic3");
        // 2.4 创建一个非持久化的不带分区的Topic的
        admin.topics().createNonPartitionedTopic("non-persistent://my_tenant/my_namespace/my-topic4");

        // 3 列出某个名称空间下, 所有的Topic
        // 无分区的topic
        List<String> topics = admin.topics().getList("my_tenant/my_namespace");
        for (String topic : topics) {
            System.out.println(topic);
        }
        // 有分区的topic
        topics = admin.topics().getPartitionedTopicList("my_tenant/my_namespace");
        for (String topic : topics) {
            System.out.println(topic);
        }
        // 4 更新Topic: 增加分区数
        admin.topics().updatePartitionedTopic("persistent://my_tenant/my_namespace/my-topic1", 5);
        int partitions = admin.topics().getPartitionedTopicMetadata("persistent://my_tenant/my_namespace/my-topic1").partitions;
        System.out.println("topic的分区数为:" + partitions);
        // 5 删除Topic
        // 删除没有分区的
        admin.topics().delete("persistent://my_tenant/my_namespace/my-topic3");
        // 删除有分区的
        //admin.topics().deletePartitionedTopic("non-persistent://my_tenant/my_namespace/my-topic2");
        // 6 关闭资源
        admin.close();
    }
}

生产数据

1.同步模式

public class PulsarProducerSyncTest {
    public static void main(String[] args) throws Exception {
        //1 获取pulsar的客户端对象
        ClientBuilder clientBuilder = PulsarClient.builder();
        clientBuilder.serviceUrl("http://hadoop001:6650");
        PulsarClient client = clientBuilder.build();
        //2. 通过客户端创建生产者的对象
        Producer<byte[]> producer = client.newProducer()
                .topic("persistent://my_tenant/my_namespace/my-topic3")
                .create();
        //3. 发送消息:
        producer.send("Puslar".getBytes());
        //4. 释放资源
        producer.close();
        client.close();
    }
}

2.异步模式

public class PulsarProducerAsyncTest {
    public static void main(String[] args) throws Exception {
        //1. 获取pulsar的客户端对象
        ClientBuilder clientBuilder = PulsarClient.builder();
        clientBuilder.serviceUrl("http://hadoop001:6650");
        PulsarClient client = clientBuilder.build();
        //2. 通过客户端创建生产者的对象
        Producer<byte[]> producer = client.newProducer()
                .topic("persistent://my_tenant/my_namespace/my-topic3")
                .create();
        //3. 发送消息:
        producer.sendAsync("Pulsar".getBytes());
        // 如果采用异步发送数据, 由于需要先放置在缓存区中, 如果立即关闭, 会导致无法发送
        Thread.sleep(1000);
        //4. 释放资源
        producer.close();
        client.close();
    }
}

消费数据

1.同步方式

//模拟pulsar的消费者
public class PulsarConsumerTest {
    public static void main(String[] args) throws Exception {
        //1. 获取pulsar的客户端对象
        ClientBuilder clientBuilder = PulsarClient.builder();
        clientBuilder.serviceUrl("http://hadoop001:6650");
        PulsarClient client = clientBuilder.build();
        //2. 通过客户端创建消费者的对象
        Consumer<byte[]> consumer = client.newConsumer()
                .topic("persistent://my_tenant/my_namespace/my-topic3")
                .subscriptionName("my-subscription")  //订阅者名称,类似于消费者组
                .subscribe();
        //3. 循环获取读取
        while(true) {
            Message<byte[]> message = consumer.receive();
            try {
                System.out.println("消息为:" + new String(message.getData()));
                consumer.acknowledge(message);
            }catch ( Exception e) {
                consumer.negativeAcknowledge(message);
            }
        }
    }
}

2.批量处理 异步

public class PulsarConsumerBatchTest {
    public static void main(String[] args) throws Exception {
        //1. 获取pulsar的客户端对象
        ClientBuilder clientBuilder = PulsarClient.builder();
        clientBuilder.serviceUrl("http://hadoop001:6650");
        PulsarClient client = clientBuilder.build();
        //2. 通过客户端创建消费者的对象
        Consumer<byte[]> consumer = client.newConsumer()
                .topic("persistent://my_tenant/my_namespace/my-topic3")
                .subscriptionName("my-subscription")
                .batchReceivePolicy(BatchReceivePolicy.builder()
                        // 设置一次性最大获取多少条消息  默认值为 -1
                        .maxNumMessages(100)  
                       // 设置每条数据允许的最大的字节大小  默认值为: 10 * 1024 * 1024
                        .maxNumBytes(1024 * 1024) 
                       //设置等待的超时时间  默认值为 100
                        .timeout(200, TimeUnit.MILLISECONDS) 
                        .build())
                .subscribe();
       //3. 循环获取读取
        while(true) {
            Messages<byte[]> messages = consumer.batchReceive(); // 批量读取数据
            for (Message<byte[]> message : messages) {
                try {
                    System.out.println("消息为:" + new String(message.getData()));

                    consumer.acknowledge(message);
                }catch ( Exception e) {
                    consumer.negativeAcknowledge(message);
                }
            }

        }
    }
}

Pulsar Flink Connector

maven依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink_log</artifactId>
    <version>1.0-SNAPSHOT</version>


    <properties>
        <flink.version>1.12.2</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
    </properties>


    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.13.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.13.3</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.13.3</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>1.13.3</version>
        </dependency>


        <!-- flink-pulsar依赖-->
        <dependency>
            <groupId>io.streamnative.connectors</groupId>
            <artifactId>pulsar-flink-connector_2.11</artifactId>
            <version>1.13.1.3</version>
        </dependency>


        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
    </dependencies>


    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

flink消费pulsar的数据并写入另一个Topic


public class ConsumerLogToAnotherTopic {
    public static void main(String[] args) throws Exception {
        //1. 创建Flink环境
        StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();
        env.disableOperatorChaining();
        env.enableCheckpointing(60*1000*5, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.setStateBackend(new MemoryStateBackend());
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(0, 3 * 1000));

        //2. 定义pulsar相关参数
        String serviceUrl = "pulsar://hadoop001:6650";
        String adminUrl = "http://hadoop001:8080";

        String topic = "persistent://public/default/my_topic_01";

        //3. 创建pulsar source
        Properties properties = new Properties();
        properties.setProperty("topic", topic);
        properties.setProperty("pulsar.producer.blockIfQueueFull", "true");

        FlinkPulsarSource<String> pulsarSource = new FlinkPulsarSource<>(
                serviceUrl,
                adminUrl,
                PulsarDeserializationSchema.valueOnly(new SimpleStringSchema()),
                properties);
        pulsarSource.setStartFromSubscription("my_subscription");

		//4. 消费数据 
        DataStreamSource<String> pulsarDS = env.addSource(pulsarSource);
		
        
		
        //5. 创建pulsar sink
        String topic2 = "persistent://public/default/my_topic_02";

        Properties props = new Properties();
        props.setProperty("topic", topic2);
        props.setProperty("partition.discovery.interval-millis", "50");
        props.setProperty(PulsarOptions.FLUSH_ON_CHECKPOINT_OPTION_KEY, "false");

        FlinkPulsarSink<String> stringFlinkPulsarSink = new FlinkPulsarSink<>(
                serviceUrl,
                adminUrl,
                Optional.of(topic2),
                props,
                new PulsarSerializationSchemaWrapper.Builder<>(new SimpleStringSchema()).useAtomicMode(DataTypes.STRING()).build()
        );
        
        //6. 写入第二个Topic
        pulsarDS.addSink(stringFlinkPulsarSink);

        env.execute();


    }
}

一些参数释义

该参数对应于StreamAPI中的FlinkPulsarSource、FlinkPulsarSink构造参数中的Properties对象以及Table模式中的配置属性参数。

ParameterDefault valueDescriptionEffective range
topicnullPulsar topicsource
topicsnull多个 topics connected 以逗号连接source
topicspatternnullMultiple Pulsar topics with more Java regular matchingsource
partition.discovery.interval-millis-1自动发现添加或删除的主题,以毫秒为单位。如果设置为-1,表示关闭。source
clientcachesize100设置缓存的Pulsar客户端数量。source, sink
auth-paramsnull设置Pulsar客户端认证参数。source, sink
auth-plugin-classnamenull设置Pulsar客户端的身份验证类名。source, sink
flushoncheckpointtrueWrite a message to Pulsar topics.sink
failonwritefalse发生接收错误时,继续确认消息。sink
polltimeoutms120000设置等待获取下一条消息的超时时间,以毫秒为单位。source
pulsar.reader.fail-on-data-losstrueWhen data is lost, the operation fails.source
pulsar.reader.use-earliest-when-data-lossfalseWhen data is lost, use earliest reset offset.source
commitmaxretries3当为pulsar消息设置偏移量时,设置重试的最大次数。source
send-delay-millisecond0delay millisecond message, just use TableApi, StreamApiusePulsarSerializationSchema.setDeliverAtExtractorSink
scan.startup.modenullSet the earliest, latest, and the position where subscribers consume news,. It is a required parameter.source
enable-key-hash-rangefalseEnable the Key-Shared subscription mode.source
pulsar.reader.*For details about Pulsar reader configurations, see Pulsar reader.source
pulsar.reader.subscriptionRolePrefixflink-pulsar-When no subscriber is specified, the prefix of the subscriber name is automatically created.source
pulsar.reader.receiverQueueSize1000Set the receive queue size.source
pulsar.producer.*For details about Pulsar producer configurations, see Pulsar producer.Sink
pulsar.producer.sendTimeoutMs30000Set the timeout for sending a message, in unit of milliseconds.Sink
pulsar.producer.blockIfQueueFullfalseThe Pulsar producer writes messages. When the queue is full, the method is blocked instead of an exception is thrown.Sink
  1. List item
 类似资料: