Azure Databricks上同时加载多个eventhub streaming源

楚丰羽
2023-12-01

这里面又是一个简单的模拟,通过创建两个event hubs。然后模拟同时给这两个event hub发流数据。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using System.Threading;

namespace ConsoleApplication1
{
    class SendMessageToEventHubs
    {
        private readonly string _connectionString = "Endpoint=*";
        private readonly string _connectionString2 = "Endpoint=*";
        private readonly string _entityPath;
        private readonly string _eventhubNamespaceName;
        private EventHubClient client, client2;

        public SendMessageToEventHubs()
        {
            client = GetClient();
            client2 = GetClient2();
        }

        public EventHubClient GetClient()
        {
            if(client == null)
            {
                client = EventHubClient.CreateFromConnectionString(_connectionString);
                client.RetryPolicy = RetryPolicy.Default;
            }

            return client;
        }

        public EventHubClient GetClient2()
        {
            if (client2 == null)
            {
                client2 = EventHubClient.CreateFromConnectionString(_connectionString2);
                client2.RetryPolicy = RetryPolicy.Default;
            }

            return client2;
        }

        public void Cleanup()
        {
            if(client != null)
            {
                client.Close();
            }
            if (client2 != null)
            {
                client2.Close();
            }
        }

        public void Send()
        {
            Random rnd = new Random();
            while (true)
            {
                var x = rnd.Next(30);
                var payload = "EventHub;" + DateTime.Now.AddMinutes(-x).ToString("yyyyMMdd HH:mm:ss");
                EventData eventData = new EventData(Encoding.ASCII.GetBytes(payload));
                if(x%2 == 0)
                    client.Send(eventData);
                else
                    client2.Send(eventData);
                Console.WriteLine("Send data '" + payload + "' to event hub " + (x%2==0?"1":"2"));
                Thread.Sleep(1000 * 60);
            }
        }
    }
}

在spark streaming程序中,同时读取两个event hub的输入源,然后union来输出。

package com.xx.naf

import org.apache.log4j.Logger
import org.apache.spark.eventhubs.{ConnectionStringBuilder, EventHubsConf, EventPosition}
import org.apache.spark.sql.functions.split
import org.apache.spark.sql.functions.to_timestamp
import org.apache.spark.sql.functions.window
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}

object LogProcessApp extends App {
  @transient private[this] lazy val logger = Logger.getLogger(getClass.getName)
  val spark = SparkSession
    .builder
    .appName("NAF")
    .getOrCreate()

  spark.sparkContext.setCheckpointDir("/temp");

  import spark.implicits._

  // Create Streaming Context and event hub Stream with provided settings and 60 seconds batches
  val ssc = new StreamingContext(spark.sparkContext, Seconds(60))

  // To connect to an Event Hub, EntityPath is required as part of the connection string.
  val connectionString = ConnectionStringBuilder("Endpoint=str*")
    .setEventHubName("eventhubtest")
    .build
  val eventHubsConf = EventHubsConf(connectionString)
    .setStartingPosition(EventPosition.fromEndOfStream)

  val streamingInputDF =
    spark
      .readStream // DataStreamReader
      .format("eventhubs") // DataStreamReader
      .options(eventHubsConf.toMap) // DataStreamReader
      .load() // DataFrame

  val connectionString2 = ConnectionStringBuilder("Endpoint=str*")
    .setEventHubName("eventhub2")
    .build
  val eventHubsConf2 = EventHubsConf(connectionString2)
    .setStartingPosition(EventPosition.fromEndOfStream)
  val streamingInputDF2 =
    spark
      .readStream // DataStreamReader
      .format("eventhubs") // DataStreamReader
      .options(eventHubsConf2.toMap) // DataStreamReader
      .load() // DataFrame

  // split lines by comma and explode the array as rows of 'word'
  val df = streamingInputDF.select($"body".cast("string"))
    .withColumn("_tmp", split($"body", ";"))
    .select(
      $"_tmp".getItem(0).as("name"),
      $"_tmp".getItem(1).as("ptime")
    ).drop("_tmp")
    .withColumn("posttime", to_timestamp($"ptime", "yyyyMMdd HH:mm:ss"))
    .drop("ptime")

  val df2 = streamingInputDF.select($"body".cast("string"))
    .withColumn("_tmp", split($"body", ";"))
    .select(
      $"_tmp".getItem(0).as("name"),
      $"_tmp".getItem(1).as("ptime")
    ).drop("_tmp")
    .withColumn("posttime", to_timestamp($"ptime", "yyyyMMdd HH:mm:ss"))
    .drop("ptime")

    val dff = df
    .union(df2)
    .withWatermark("posttime", "15 minutes")
    .groupBy(
      window($"posttime", "5 minutes", "5 minutes"),
      $"name"
    )
    .count()
    .writeStream
    .outputMode("update")
    .format("console")
    .start()

  dff.awaitTermination()

  ssc.start() // Start the computation
  ssc.awaitTermination() // Wait for the computation to terminate
  spark.stop()
}

运行上面的程序,会发现start是基于df来操作的,df来源于event hub1。于是我们可以看到输出的结果的batch也是基于event hub1。但是在计数上则会考虑到event hub2的数据。比如下面的batch 513是因为event hub1的数据被dispose了,而其他两个batch的输出结果则是event hub1有落在相应window的event出现。

-------------------------------------------
Batch: 512
-------------------------------------------
+--------------------+--------+-----+
|              window|    name|count|
+--------------------+--------+-----+
|[2018-06-21 10:30...|EventHub|    4|
+--------------------+--------+-----+


2018-06-21T02:54:57.684+0000: [GC (System.gc()) [PSYoungGen: 1903045K->29020K(2098688K)] 2425018K->550993K(6372352K), 0.0169861 secs] [Times: user=0.05 sys=0.00, real=0.01 secs] 
2018-06-21T02:54:57.701+0000: [Full GC (System.gc()) [PSYoungGen: 29020K->0K(2098688K)] [ParOldGen: 521973K->517894K(4273664K)] 550993K->517894K(6372352K), [Metaspace: 178014K->177795K(1208320K)], 0.6440961 secs] [Times: user=2.04 sys=0.00, real=0.65 secs] 
-------------------------------------------
Batch: 513
-------------------------------------------
+------+----+-----+
|window|name|count|
+------+----+-----+
+------+----+-----+


2018-06-21T02:57:51.797+0000: [GC (Allocation Failure) [PSYoungGen: 2063872K->15625K(2100224K)] 2581766K->533519K(6373888K), 0.0223677 secs] [Times: user=0.05 sys=0.00, real=0.02 secs] 
-------------------------------------------
Batch: 514
-------------------------------------------
+--------------------+--------+-----+
|              window|    name|count|
+--------------------+--------+-----+
|[2018-06-21 10:30...|EventHub|    6|
+--------------------+--------+-----+


 类似资料: