Event Hub: https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-what-is-event-hubs
Event Hub和Databricks集成:https://docs.azuredatabricks.net/spark/latest/structured-streaming/streaming-event-hubs.html
看文档总是比较生涩,这里我们通过模拟一个简单的spark streaming job来说明databricks上怎么用eventhub作为数据源来运行spark streaming job。首先我们需要在azure上建一个event hub,获取到它的connection string。有了这个string,我们就可以模拟往这个event上发数据流信息。下面就是一个简单的例子。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using System.Threading;
namespace ConsoleApplication1
{
class SendMessageToEventHubs
{
private readonly string _connectionString;
private readonly string _entityPath;
private readonly string _eventhubNamespaceName;
private EventHubClient client;
/// <summary>
/// Constructor for EventHubClientPool
/// </summary>
/// <param name="eventHubName"></param>
/// <param name="connecitonString"></param>
public SendMessageToEventHubs(string connecitonString, string entityPath)
{
_connectionString = connecitonString;
_entityPath = entityPath;
var prefix = "://";
var urlSuffix = ".servicebus.windows.net";
var startIndex = _connectionString.IndexOf(prefix) + prefix.Length;
var endIndex = _connectionString.IndexOf(urlSuffix);
_eventhubNamespaceName = _connectionString.Substring(startIndex, endIndex - startIndex);
client = GetClient();
}
public EventHubClient GetClient()
{
if(client == null)
{
client = EventHubClient.CreateFromConnectionString(_connectionString);
client.RetryPolicy = RetryPolicy.Default;
}
return client;
}
public void Cleanup()
{
if(client != null)
{
client.Close();
}
}
public void Send()
{
Random rnd = new Random();
while (true)
{
var payload = "EventHub;" + DateTime.Now.AddMinutes(-rnd.Next(30)).ToString("yyyyMMdd HH:mm:ss");
EventData eventData = new EventData(Encoding.ASCII.GetBytes(payload));
client.Send(eventData);
Console.WriteLine("Send data on " + payload);
Thread.Sleep(1000 * 60);
}
}
}
}
运行这个程序之后,我们在azure网站上的event hub界面,就可以看到请求(Requests)、信息(Messages)都不再是0了。当event hub已经有了数据之后,我们需要从中取出数据作为spark streaming job的输入。下面的scala代码就是这样一个简单的目的。
import org.apache.log4j.Logger
import org.apache.spark.eventhubs.{ConnectionStringBuilder, EventHubsConf, EventPosition}
import org.apache.spark.sql.functions.split
import org.apache.spark.sql.functions.to_timestamp
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
object AFLogProcessApp extends App {
@transient private[this] lazy val logger = Logger.getLogger(getClass.getName)
val spark = SparkSession
.builder
.appName("AF")
.getOrCreate()
spark.sparkContext.setCheckpointDir("/temp");
import spark.implicits._
// Create Streaming Context and Kafka Direct Stream with provided settings and 10 seconds batches
val ssc = new StreamingContext(spark.sparkContext, Seconds(60))
//set new runtime options
//spark.conf.set("spark.sql.shuffle.partitions", 6)
//spark.conf.set("spark.executor.memory", "2g")
//get all settings
//val configMap:Map[String, String] = spark.conf.getAll
// To connect to an Event Hub, EntityPath is required as part of the connection string.
// Here, we assume that the connection string from the Azure portal does not have the EntityPath part.
val connectionString = ConnectionStringBuilder("Endpoint=sb://**.windows.net/;SharedAccessKeyName=SharedAccessKey;SharedAccessKey=**/T0w=;EntityPath=eventhubtest")
.setEventHubName("eventhubtest")
.build
val eventHubsConf = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromEndOfStream)
val streamingInputDF =
spark
.readStream // DataStreamReader
.format("eventhubs") // DataStreamReader
.options(eventHubsConf.toMap) // DataStreamReader
.load() // DataFrame
// split lines by whitespaces and explode the array as rows of 'word'
val df = streamingInputDF.select($"body".cast("string"))
.withColumn("_tmp", split($"body", ";"))
.select(
$"_tmp".getItem(0).as("name"),
$"_tmp".getItem(1).as("ptime")
).drop("_tmp")
.withColumn("posttime", to_timestamp($"ptime", "yyyyMMdd HH:mm:ss"))
.drop("ptime")
.withWatermark("posttime", "15 minutes")
.writeStream
.outputMode("append")
.format("console")
.start()
df.awaitTermination()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
spark.stop()
}
当放到databricks执行后,我们就可以在log中看到下面类似的输出,表示基本上成功的从event hub中取出了数据。
-------------------------------------------
Batch: 24
-------------------------------------------
+--------+-------------------+
| name| posttime|
+--------+-------------------+
|EventHub|2018-06-19 10:29:09|
+--------+-------------------+