Azure中databricks和eventhub的集成

文凯康
2023-12-01

Event Hub: https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-what-is-event-hubs

Event Hub和Databricks集成:https://docs.azuredatabricks.net/spark/latest/structured-streaming/streaming-event-hubs.html

看文档总是比较生涩,这里我们通过模拟一个简单的spark streaming job来说明databricks上怎么用eventhub作为数据源来运行spark streaming job。首先我们需要在azure上建一个event hub,获取到它的connection string。有了这个string,我们就可以模拟往这个event上发数据流信息。下面就是一个简单的例子。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using System.Threading;

namespace ConsoleApplication1
{
    class SendMessageToEventHubs
    {
        private readonly string _connectionString;
        private readonly string _entityPath;
        private readonly string _eventhubNamespaceName;
        private EventHubClient client;

        /// <summary>
        /// Constructor for EventHubClientPool
        /// </summary>
        /// <param name="eventHubName"></param>
        /// <param name="connecitonString"></param>
        public SendMessageToEventHubs(string connecitonString, string entityPath)
        {
            _connectionString = connecitonString;
            _entityPath = entityPath;
            var prefix = "://";
            var urlSuffix = ".servicebus.windows.net";
            var startIndex = _connectionString.IndexOf(prefix) + prefix.Length;
            var endIndex = _connectionString.IndexOf(urlSuffix);
            _eventhubNamespaceName = _connectionString.Substring(startIndex, endIndex - startIndex);
            client = GetClient();
        }

        public EventHubClient GetClient()
        {
            if(client == null)
            {
                client = EventHubClient.CreateFromConnectionString(_connectionString);
                client.RetryPolicy = RetryPolicy.Default;
            }

            return client;
        }

        public void Cleanup()
        {
            if(client != null)
            {
                client.Close();
            }
        }

        public void Send()
        {
            Random rnd = new Random();
            while (true)
            {
                var payload = "EventHub;" + DateTime.Now.AddMinutes(-rnd.Next(30)).ToString("yyyyMMdd HH:mm:ss");
                EventData eventData = new EventData(Encoding.ASCII.GetBytes(payload));
                client.Send(eventData);
                Console.WriteLine("Send data on " + payload);
                Thread.Sleep(1000 * 60);
            }
        }
    }
}

运行这个程序之后,我们在azure网站上的event hub界面,就可以看到请求(Requests)、信息(Messages)都不再是0了。当event hub已经有了数据之后,我们需要从中取出数据作为spark streaming job的输入。下面的scala代码就是这样一个简单的目的。

import org.apache.log4j.Logger
import org.apache.spark.eventhubs.{ConnectionStringBuilder, EventHubsConf, EventPosition}
import org.apache.spark.sql.functions.split
import org.apache.spark.sql.functions.to_timestamp
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}

object AFLogProcessApp extends App {
  @transient private[this] lazy val logger = Logger.getLogger(getClass.getName)
  val spark = SparkSession
    .builder
    .appName("AF")
    .getOrCreate()

  spark.sparkContext.setCheckpointDir("/temp");

  import spark.implicits._

  // Create Streaming Context and Kafka Direct Stream with provided settings and 10 seconds batches
  val ssc = new StreamingContext(spark.sparkContext, Seconds(60))

  //set new runtime options
  //spark.conf.set("spark.sql.shuffle.partitions", 6)
  //spark.conf.set("spark.executor.memory", "2g")
  //get all settings
  //val configMap:Map[String, String] = spark.conf.getAll

  // To connect to an Event Hub, EntityPath is required as part of the connection string.
  // Here, we assume that the connection string from the Azure portal does not have the EntityPath part.
  val connectionString = ConnectionStringBuilder("Endpoint=sb://**.windows.net/;SharedAccessKeyName=SharedAccessKey;SharedAccessKey=**/T0w=;EntityPath=eventhubtest")
    .setEventHubName("eventhubtest")
    .build
  val eventHubsConf = EventHubsConf(connectionString)
    .setStartingPosition(EventPosition.fromEndOfStream)

  val streamingInputDF =
    spark
      .readStream // DataStreamReader
      .format("eventhubs") // DataStreamReader
      .options(eventHubsConf.toMap) // DataStreamReader
      .load() // DataFrame

  // split lines by whitespaces and explode the array as rows of 'word'
  val df = streamingInputDF.select($"body".cast("string"))
    .withColumn("_tmp", split($"body", ";"))
    .select(
      $"_tmp".getItem(0).as("name"),
      $"_tmp".getItem(1).as("ptime")
    ).drop("_tmp")
    .withColumn("posttime", to_timestamp($"ptime", "yyyyMMdd HH:mm:ss"))
    .drop("ptime")
    .withWatermark("posttime", "15 minutes")
    .writeStream
    .outputMode("append")
    .format("console")
    .start()

  df.awaitTermination()

  ssc.start() // Start the computation
  ssc.awaitTermination() // Wait for the computation to terminate
  spark.stop()
}

当放到databricks执行后,我们就可以在log中看到下面类似的输出,表示基本上成功的从event hub中取出了数据。

-------------------------------------------
Batch: 24
-------------------------------------------
+--------+-------------------+
|    name|           posttime|
+--------+-------------------+
|EventHub|2018-06-19 10:29:09|
+--------+-------------------+

 类似资料: