这里面又是一个简单的模拟,通过创建两个event hubs。然后模拟同时给这两个event hub发流数据。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using System.Threading;
namespace ConsoleApplication1
{
class SendMessageToEventHubs
{
private readonly string _connectionString = "Endpoint=*";
private readonly string _connectionString2 = "Endpoint=*";
private readonly string _entityPath;
private readonly string _eventhubNamespaceName;
private EventHubClient client, client2;
public SendMessageToEventHubs()
{
client = GetClient();
client2 = GetClient2();
}
public EventHubClient GetClient()
{
if(client == null)
{
client = EventHubClient.CreateFromConnectionString(_connectionString);
client.RetryPolicy = RetryPolicy.Default;
}
return client;
}
public EventHubClient GetClient2()
{
if (client2 == null)
{
client2 = EventHubClient.CreateFromConnectionString(_connectionString2);
client2.RetryPolicy = RetryPolicy.Default;
}
return client2;
}
public void Cleanup()
{
if(client != null)
{
client.Close();
}
if (client2 != null)
{
client2.Close();
}
}
public void Send()
{
Random rnd = new Random();
while (true)
{
var x = rnd.Next(30);
var payload = "EventHub;" + DateTime.Now.AddMinutes(-x).ToString("yyyyMMdd HH:mm:ss");
EventData eventData = new EventData(Encoding.ASCII.GetBytes(payload));
if(x%2 == 0)
client.Send(eventData);
else
client2.Send(eventData);
Console.WriteLine("Send data '" + payload + "' to event hub " + (x%2==0?"1":"2"));
Thread.Sleep(1000 * 60);
}
}
}
}
在spark streaming程序中,同时读取两个event hub的输入源,然后union来输出。
package com.xx.naf
import org.apache.log4j.Logger
import org.apache.spark.eventhubs.{ConnectionStringBuilder, EventHubsConf, EventPosition}
import org.apache.spark.sql.functions.split
import org.apache.spark.sql.functions.to_timestamp
import org.apache.spark.sql.functions.window
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
object LogProcessApp extends App {
@transient private[this] lazy val logger = Logger.getLogger(getClass.getName)
val spark = SparkSession
.builder
.appName("NAF")
.getOrCreate()
spark.sparkContext.setCheckpointDir("/temp");
import spark.implicits._
// Create Streaming Context and event hub Stream with provided settings and 60 seconds batches
val ssc = new StreamingContext(spark.sparkContext, Seconds(60))
// To connect to an Event Hub, EntityPath is required as part of the connection string.
val connectionString = ConnectionStringBuilder("Endpoint=str*")
.setEventHubName("eventhubtest")
.build
val eventHubsConf = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromEndOfStream)
val streamingInputDF =
spark
.readStream // DataStreamReader
.format("eventhubs") // DataStreamReader
.options(eventHubsConf.toMap) // DataStreamReader
.load() // DataFrame
val connectionString2 = ConnectionStringBuilder("Endpoint=str*")
.setEventHubName("eventhub2")
.build
val eventHubsConf2 = EventHubsConf(connectionString2)
.setStartingPosition(EventPosition.fromEndOfStream)
val streamingInputDF2 =
spark
.readStream // DataStreamReader
.format("eventhubs") // DataStreamReader
.options(eventHubsConf2.toMap) // DataStreamReader
.load() // DataFrame
// split lines by comma and explode the array as rows of 'word'
val df = streamingInputDF.select($"body".cast("string"))
.withColumn("_tmp", split($"body", ";"))
.select(
$"_tmp".getItem(0).as("name"),
$"_tmp".getItem(1).as("ptime")
).drop("_tmp")
.withColumn("posttime", to_timestamp($"ptime", "yyyyMMdd HH:mm:ss"))
.drop("ptime")
val df2 = streamingInputDF.select($"body".cast("string"))
.withColumn("_tmp", split($"body", ";"))
.select(
$"_tmp".getItem(0).as("name"),
$"_tmp".getItem(1).as("ptime")
).drop("_tmp")
.withColumn("posttime", to_timestamp($"ptime", "yyyyMMdd HH:mm:ss"))
.drop("ptime")
val dff = df
.union(df2)
.withWatermark("posttime", "15 minutes")
.groupBy(
window($"posttime", "5 minutes", "5 minutes"),
$"name"
)
.count()
.writeStream
.outputMode("update")
.format("console")
.start()
dff.awaitTermination()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
spark.stop()
}
运行上面的程序,会发现start是基于df来操作的,df来源于event hub1。于是我们可以看到输出的结果的batch也是基于event hub1。但是在计数上则会考虑到event hub2的数据。比如下面的batch 513是因为event hub1的数据被dispose了,而其他两个batch的输出结果则是event hub1有落在相应window的event出现。
-------------------------------------------
Batch: 512
-------------------------------------------
+--------------------+--------+-----+
| window| name|count|
+--------------------+--------+-----+
|[2018-06-21 10:30...|EventHub| 4|
+--------------------+--------+-----+
2018-06-21T02:54:57.684+0000: [GC (System.gc()) [PSYoungGen: 1903045K->29020K(2098688K)] 2425018K->550993K(6372352K), 0.0169861 secs] [Times: user=0.05 sys=0.00, real=0.01 secs]
2018-06-21T02:54:57.701+0000: [Full GC (System.gc()) [PSYoungGen: 29020K->0K(2098688K)] [ParOldGen: 521973K->517894K(4273664K)] 550993K->517894K(6372352K), [Metaspace: 178014K->177795K(1208320K)], 0.6440961 secs] [Times: user=2.04 sys=0.00, real=0.65 secs]
-------------------------------------------
Batch: 513
-------------------------------------------
+------+----+-----+
|window|name|count|
+------+----+-----+
+------+----+-----+
2018-06-21T02:57:51.797+0000: [GC (Allocation Failure) [PSYoungGen: 2063872K->15625K(2100224K)] 2581766K->533519K(6373888K), 0.0223677 secs] [Times: user=0.05 sys=0.00, real=0.02 secs]
-------------------------------------------
Batch: 514
-------------------------------------------
+--------------------+--------+-----+
| window| name|count|
+--------------------+--------+-----+
|[2018-06-21 10:30...|EventHub| 6|
+--------------------+--------+-----+