flink-pulsar-connector示例

淳于升
2023-12-01

版本:

pulsar:2.7.1

flink:1.12.2

maven依赖:

<!-- flink-pulsar依赖-->
    <dependency>
      <groupId>io.streamnative.connectors</groupId>
      <artifactId>pulsar-flink-connector_2.11</artifactId>
      <version>1.12.2.1</version>
    </dependency>

flink消费pulsar:

import java.util.Properties;

/**
 * @description: flink消费pulsar中数据
 * @author: txy
 * @date: 2021/5/11
 **/
public class PulsarToFlink {
    public static void main(String[] args) throws Exception {
        //创建环境,设置参数
        StreamExecutionEnvironment env =             StreamExecutionEnvironment.getExecutionEnvironment();
        env.disableOperatorChaining();
        env.enableCheckpointing(60*1000*5, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.setStateBackend(new MemoryStateBackend());
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(0, 3 * 1000));

        //定义pulsar相关参数
        String serviceUrl = "pulsar://localhost:6650";
        String adminUrl = "http://localhost:8080";
        String topic = "persistent://tenant/namespaces/topic";

        //创建pulsar source
        Properties properties = new Properties();
        properties.setProperty("topic", topic);
        properties.setProperty("pulsar.producer.blockIfQueueFull", "true");
        FlinkPulsarSource<String> pulsarSource = new FlinkPulsarSource<>(serviceUrl, adminUrl, PulsarDeserializationSchema.valueOnly(new SimpleStringSchema()), properties);
        pulsarSource.setStartFromEarliest();

        DataStreamSource<String> pulsarDS = env.addSource(pulsarSource);
       

        env.execute();
    }
}

flink写入pulsar:

import ai.sd.sdbd.etl.news.util.KafkaUtil;
import org.apache.commons.collections.map.HashedMap;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions;
import org.apache.flink.streaming.util.serialization.PulsarSerializationSchemaWrapper;
import org.apache.flink.table.api.DataTypes;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

/**
 * @description: flink消费kafka数据写入pulsar对应topic中
 * @author: txy
 * @date: 2021/5/11
 **/
public class FlinkToPulsar {
    public static void main(String[] args) throws Exception {
        //创建环境,设置参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.disableOperatorChaining();
        env.enableCheckpointing(60*1000*5, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.setStateBackend(new MemoryStateBackend());
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(0, 3 * 1000));

        //定义kafka相关参数
        String groupId = "kafka-group-id-1";
        String newsTopic = "topic";

        //指定kafka offset
        Map<KafkaTopicPartition, Long> offsets = new HashedMap();
        offsets.put(new KafkaTopicPartition("news_spider_sync_news", 0), 50000L);
        offsets.put(new KafkaTopicPartition("news_spider_sync_news", 1), 50000L);

        //从kafka获取数据
        FlinkKafkaConsumer<ObjectNode> newsSource = KafkaUtil.getKafkaSource(newsTopic, groupId);
        newsSource.setStartFromSpecificOffsets(offsets);
        DataStreamSource<ObjectNode> newsDS = env.addSource(newsSource);
        SingleOutputStreamOperator<String> valueDS = newsDS.map(jsonNodes -> jsonNodes.get("value").toString());

        //定义topic
        String topic = "persistent://tenant/namespace/topic";

        //定义配置
        Properties props = new Properties();
        props.setProperty("topic", topic);
        props.setProperty("partition.discovery.interval-millis", "5000");
        props.setProperty(PulsarOptions.FLUSH_ON_CHECKPOINT_OPTION_KEY, "false");

        //创建pulsar producer
        FlinkPulsarSink<String> stringFlinkPulsarSink = new FlinkPulsarSink<>(
                "pulsar://localhost:6650",
                "http://localhost:18080",
                Optional.of(topic),
                props,
                new PulsarSerializationSchemaWrapper.Builder<>(new SimpleStringSchema()).useAtomicMode(DataTypes.STRING()).build()
        );

        valueDS.addSink(stringFlinkPulsarSink);
        valueDS.print();

        env.execute();
    }
}

 

 类似资料: