flink,pulsar异步访问redis,并将数据写入Pg

归鹤龄
2023-12-01
package Pulsar;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.pulsar.source.PulsarSource;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

/*********************************
 @Author:xiaoyan.qin
 @Description:
 @Date:Created in 18:05 2022/3/28
 @Modified By:
 **********************************/
public class test {
    private static Logger LOGGER = LoggerFactory.getLogger(test.class);

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.enableCheckpointing(300000);
        // 设置模式为精确一次 (这是默认值)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 确认 checkpoints 之间的时间会进行 500 ms
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

        // Checkpoint 必须在一分钟内完成,否则就会被抛弃
        env.getCheckpointConfig().setCheckpointTimeout(60000);

        // 允许两个连续的 checkpoint 错误
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);

        // 同一时间只允许一个 checkpoint 进行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        env.getCheckpointConfig().setCheckpointStorage("file:///d/temp/checkpoint");


        // 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3,
                Time.of(10, TimeUnit.SECONDS)
        ));
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        PulsarSource<String> pulsarSource = PulsarSource.builder()
                .setServiceUrl("pulsar://localhost:6650")
                .setAdminUrl("http://localhost:8080")
                .setStartCursor(StartCursor.latest())
                .setTopics("test")
                .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema()))
                .setSubscriptionName("my-subscription")
                .setSubscriptionType(SubscriptionType.Exclusive)
                .build();



        DataStream<String> stream = env.fromSource(pulsarSource, WatermarkStrategy.forMonotonousTimestamps(), "Pulsar Source");


        SingleOutputStreamOperator<String> resultStream=  AsyncDataStream.unorderedWait(stream, new AsyncRedis2("redis://localhost"), 500L, TimeUnit.MILLISECONDS)
                .setParallelism(1);

        resultStream.addSink(JdbcSink.sink(
                "insert into test (id) values (?)",
                (ps, t) -> {
                    ps.setInt(1,Integer.valueOf(t));

                },
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:postgresql://localhost/postgres")
                        .withDriverName( "org.postgresql.Driver").withUsername("postgres").withPassword("mysecretpassword")
                        .build()));
//        Table inputTable = tEnv.fromDataStream(resultStream);
//        tEnv.createTemporaryView("InputTable", inputTable);
//        tEnv.sqlQuery("SELECT count(*) FROM InputTable").execute().print();
        //tEnv.from("InputTable").execute().print();
        resultStream.print();
       // stream.print();
        env.execute("qxy");
    }
}




package Pulsar;

import com.alibaba.fastjson.JSON;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;


/*********************************
 @Author:xiaoyan.qin
 @Description:
 @Date:Created in 10:58 2022/4/1
 @Modified By:
 **********************************/
public class AsyncRedis2 extends RichAsyncFunction<String, String> {
    private RedisAsyncCommands<String,String> async;
    private String url;
    private StatefulRedisConnection<String,String> connection;
    private RedisClient redisClient;
    public AsyncRedis2(String url) {
        this.url = url;
    }
    @Override
    public void open(Configuration parameters) throws Exception{
        redisClient = RedisClient.create(url);
        connection = redisClient.connect();
        async  = connection.async();
    }

    @Override
    public void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {
        String imei = JSON.parseObject(input).get("eventId").toString();
        RedisFuture<String> redisFuture = async.hget("userlogin:"+imei,"context");
        CompletableFuture.supplyAsync(() -> {
            try {
                return redisFuture.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            return "exception";
        }).thenAccept(result -> {
            if (result == null) {
                result = "nothing";
            }
            // return result
            resultFuture.complete(Collections.singleton( JSON.parseObject(result).get("userId").toString()));

        });
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (connection != null) {
            connection.close();
        }
        if (redisClient != null) {
            redisClient.shutdown();
        }
    }

    @Override
    public void timeout(String input, ResultFuture<String> resultFuture) throws Exception {
        //超时了。丢弃即可
        System.out.println("超时了:" + input);
        //重新调用处理方法
      // asyncInvoke(input,resultFuture);

    }
}

<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>frauddetection</groupId>
	<artifactId>frauddetection</artifactId>
	<version>0.1</version>
	<packaging>jar</packaging>

	<name>Flink Walkthrough DataStream Java</name>
	<url>https://flink.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<flink.version>1.14.3</flink.version>
		<target.java.version>1.8</target.java.version>
		<scala-version>2.11</scala-version>
		<java.version>1.8</java.version>
		<scala.bin.version>2.11</scala.bin.version>
		<scala.binary.version>2.11</scala.binary.version>
		<maven.compiler.source>${target.java.version}</maven.compiler.source>
		<maven.compiler.target>${target.java.version}</maven.compiler.target>
		<log4j.version>2.17.1</log4j.version>
	</properties>



	<dependencies>
		<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.79</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
		<dependency>
			<groupId>redis.clients</groupId>
			<artifactId>jedis</artifactId>
			<version>3.8.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka_2.11</artifactId>
			<version>1.14.3</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>


		<!-- This dependency is provided, because it should not be packaged into the JAR file. -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
			<scope>compile</scope>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-scala_2.11</artifactId>
			<version>${flink.version}</version>
			<scope>compile</scope>

		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-scala_2.11</artifactId>
			<version>${flink.version}</version>
			<scope>compile</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-planner_2.11</artifactId>
			<version>1.14.4</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-scala_2.11</artifactId>
			<version>1.14.4</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java-bridge_2.11</artifactId>
			<version>1.14.4</version>
			<scope>provided</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
			<scope>compile</scope>

		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-pulsar -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-pulsar_2.11</artifactId>
			<version>1.14.4</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-jdbc_2.11</artifactId>
			<version>1.14.4</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.postgresql/postgresql -->
		<dependency>
			<groupId>org.postgresql</groupId>
			<artifactId>postgresql</artifactId>
			<version>42.3.3</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-runtime-web-->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-runtime-web_2.12</artifactId>
			<version>1.14.3</version>
			<scope>compile</scope>
		</dependency>

		<dependency>
			<groupId>io.lettuce</groupId>
			<artifactId>lettuce-core</artifactId>
			<version>5.0.5.RELEASE</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core 	-->
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-core_2.11</artifactId>
			<version>2.4.8</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-->
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-sql_2.11</artifactId>
			<version>2.4.8</version>
			<scope>provided</scope>
		</dependency>


			<dependency>
				<groupId>com.redislabs</groupId>
				<artifactId>spark-redis_2.11</artifactId>
				<version>2.4.2</version>
			</dependency>



		<!-- Add connector dependencies here. They must be in the default scope (compile). -->

		<!-- Example:

		<dependency>
		    <groupId>org.apache.flink</groupId>
		    <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
		    <version>${flink.version}</version>
		</dependency>
		-->

		<!-- Add logging framework, to produce console output when running in the IDE. -->
		<!-- These dependencies are excluded from the application JAR by default. -->
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-slf4j-impl</artifactId>
			<version>${log4j.version}</version>
			<scope>compile</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-api</artifactId>
			<version>${log4j.version}</version>
			<scope>compile</scope>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>1.7.7</version>
			<scope>compile</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-core</artifactId>
			<version>${log4j.version}</version>
			<scope>compile</scope>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>RELEASE</version>
			<scope>compile</scope>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>RELEASE</version>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-state-processor-api_2.11</artifactId>
			<version>1.14.4</version>
		</dependency>
		<!--
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-test-utils-junit</artifactId>
			<version>1.14.4</version>
			<scope>test</scope>
		</dependency>
		-->
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<version>3.0.0</version>
				<executions>
					<!-- Run shade goal on package phase -->
					<execution>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
						<configuration>
							<artifactSet>
								<excludes>
									<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
									<exclude>com.google.code.findbugs:jsr305</exclude>
									<exclude>org.slf4j:*</exclude>
									<exclude>org.apache.logging.log4j:*</exclude>
								</excludes>
							</artifactSet>
							<filters>
								<filter>
									<!-- Do not copy the signatures in the META-INF folder.
									Otherwise, this might cause SecurityExceptions when using the JAR. -->
									<artifact>*:*</artifact>
									<excludes>
										<exclude>META-INF/*.SF</exclude>
										<exclude>META-INF/*.DSA</exclude>
										<exclude>META-INF/*.RSA</exclude>
									</excludes>
								</filter>
							</filters>
							<transformers>
								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
									<mainClass>spendreport.FraudDetectionJob</mainClass>
								</transformer>
							</transformers>
						</configuration>
					</execution>
				</executions>
			</plugin>


			<!-- Java Compiler -->
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.1</version>
				<configuration>
					<source>${target.java.version}</source>
					<target>${target.java.version}</target>
				</configuration>
			</plugin>
			<!-- Scala Compiler -->
			<plugin>
				<groupId>net.alchim31.maven</groupId>
				<artifactId>scala-maven-plugin</artifactId>
				<version>3.2.2</version>
				<executions>
					<execution>
						<goals>
							<goal>compile</goal>
							<goal>testCompile</goal>
						</goals>
					</execution>
				</executions>
				<configuration>
					<args>
						<arg>-nobootcp</arg>
					</args>
				</configuration>
			</plugin>
			<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
			<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->

		</plugins>

		<pluginManagement>
			<plugins>

				<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
				<plugin>
					<groupId>org.eclipse.m2e</groupId>
					<artifactId>lifecycle-mapping</artifactId>
					<version>1.0.0</version>
					<configuration>
						<lifecycleMappingMetadata>
							<pluginExecutions>
								<pluginExecution>
									<pluginExecutionFilter>
										<groupId>org.apache.maven.plugins</groupId>
										<artifactId>maven-shade-plugin</artifactId>
										<versionRange>[3.0.0,)</versionRange>
										<goals>
											<goal>shade</goal>
										</goals>
									</pluginExecutionFilter>
									<action>
										<ignore/>
									</action>
								</pluginExecution>
								<pluginExecution>
									<pluginExecutionFilter>
										<groupId>org.apache.maven.plugins</groupId>
										<artifactId>maven-compiler-plugin</artifactId>
										<versionRange>[3.1,)</versionRange>
										<goals>
											<goal>testCompile</goal>
											<goal>compile</goal>
										</goals>
									</pluginExecutionFilter>
									<action>
										<ignore/>
									</action>
								</pluginExecution>
							</pluginExecutions>
						</lifecycleMappingMetadata>
					</configuration>
				</plugin>
			</plugins>
		</pluginManagement>
	</build>
</project>

 类似资料: