我们常常会有这样的应用场景,就是一个复杂系统可能是由多个模块组成,然后每个模块都会有自己的logging系统。最后我们在处理日志数据的时候,会希望能够把不同模块的日志数据join在一起。一个简单的例子就是,广告有显示的日志,然后当用户点击了广告后又会有点击的日志,那么如果我想把广告信息和点击信息组合在一起就需要从这两个日志流源得到的数据进行join。这在API文档里面有相应的介绍Stream-Stream-Joins.
下面这个例子模拟的是,我们往两个eventhubs分别发送信息,包括id、value以及事件时间。对于每一个id,假定都会给两个eventhubs更发送一次信息。下面的streaming代码则是来测试从这两个eventhubs数据源读取数据进行join的操作。
public void Send()
{
Random rnd = new Random();
int id = 100000;
while (true)
{
var x = rnd.Next(30);
var payload1 = id.ToString() + ";" + x.ToString() + ";" + DateTime.Now.AddMinutes(-x).ToString("yyyyMMdd HH:mm:ss");
EventData eventData1 = new EventData(Encoding.ASCII.GetBytes(payload1));
client.Send(eventData1);
Console.WriteLine("Send data '" + payload1 + "' to event hub 1");
var y = rnd.Next(30);
var payload2 = id.ToString() + ";" + x.ToString() + ";" + DateTime.Now.AddMinutes(-y).ToString("yyyyMMdd HH:mm:ss");
EventData eventData2 = new EventData(Encoding.ASCII.GetBytes(payload2));
client2.Send(eventData2);
Console.WriteLine("Send data '" + payload2 + "' to event hub 2");
Thread.Sleep(1000 * 60);
id++;
}
}
val df = streamingInputDF.select($"body".cast("string"))
.withColumn("_tmp", split($"body", ";"))
.select(
$"_tmp".getItem(0).as("id"),
$"_tmp".getItem(1).as("value1"),
$"_tmp".getItem(2).as("ptime1")
).drop("_tmp")
.withColumn("posttime1", to_timestamp($"ptime1", "yyyyMMdd HH:mm:ss"))
.drop("ptime1")
.withWatermark("posttime1", "15 minutes")
val df2 = streamingInputDF2.select($"body".cast("string"))
.withColumn("_tmp", split($"body", ";"))
.select(
$"_tmp".getItem(0).as("id2"),
$"_tmp".getItem(1).as("value2"),
$"_tmp".getItem(2).as("ptime2")
).drop("_tmp")
.withColumn("posttime2", to_timestamp($"ptime2", "yyyyMMdd HH:mm:ss"))
.drop("ptime2")
.withWatermark("posttime2", "15 minutes")
val dff = df
.join(
df2,
expr("""
id = id2 AND
posttime1 >= posttime2 - interval 15 minutes AND
posttime1 <= posttime2 + interval 15 minutes
"""),
joinType = "leftOuter"
)
.writeStream
.outputMode("append")
.format("console")
.start()
dff.awaitTermination()
在databricks上运行这个代码,下面是输出的结果。这里两个streaming都有watermark,因而都会抛弃掉一些delay比较多的log。然后在join阶段,这里不但用了id作为join key,还兼顾了event time来保证两个流的时间比较接近从而使join成功。但是因为根据API介绍,目前join无法支持full outer join模式,这将会是一个缺陷。
-------------------------------------------
Batch: 247
-------------------------------------------
+------+------+-------------------+------+------+-------------------+
| id|value1| posttime1| id2|value2| posttime2|
+------+------+-------------------+------+------+-------------------+
|100103| 10|2018-06-26 14:59:17|100103| 10|2018-06-26 15:06:17|
|100071| 8|2018-06-26 14:29:04| null| null| null|
+------+------+-------------------+------+------+-------------------+
-------------------------------------------
Batch: 248
-------------------------------------------
+---+------+---------+---+------+---------+
| id|value1|posttime1|id2|value2|posttime2|
+---+------+---------+---+------+---------+
+---+------+---------+---+------+---------+
-------------------------------------------
Batch: 249
-------------------------------------------
+------+------+-------------------+------+------+-------------------+
| id|value1| posttime1| id2|value2| posttime2|
+------+------+-------------------+------+------+-------------------+
|100075| 10|2018-06-26 14:31:06| null| null| null|
|100104| 5|2018-06-26 15:05:17|100104| 5|2018-06-26 15:05:18|
+------+------+-------------------+------+------+-------------------+