当前位置: 首页 > 工具软件 > Pilosa > 使用案例 >

Pilosa文档翻译(三)示例

柯波峻
2023-12-01

原文地址:https://www.pilosa.com/docs/latest/examples/
位图数据库:Pilosa查询十亿级出租车搭乘数据案例

简单说明 Introduction

纽约市发布了一个非常详细的包含了超过10亿条出租车搭乘数据的集合。该数据已经成为科技博客分析的热门目标,并且已经得到了很好的研究。出于这个原因,我们认为将这些数据导入Pilosa,以便确定同一数据集情况下与其他数据存储和技术进行比较。

一般来说,传输(Transportation)是Pilosa的值得关注的用例,因为它通常涉及多个不同的数据源,以及高速率,实时和极大量的数据(特别是如果想得出合理的结论)。

我们编写了一个工具来帮助将NYC(纽约市)出租车数据导入Pilosa这个工具是PDK(Pilosa开发工具包)的一部分,并利用了许多可重复使用的模块,这些模块也可以帮助您导入其他数据。 接下来,我们将逐步解释整个过程。

初始设置之后,PDK导入工具会执行我们定义Pilosa架构所需的一切,相应地将数据映射到位图,然后将其导入Pilosa

数据模型 Data Model

纽约出租车数据由以下列出的许多csv文件组成:http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml。 这些数据文件大约有20列,其中大约一半与我们正在研究的基准查询相关:

  • 距离(Distance): miles(英里), floating point(浮点值)
  • 车费(Fare): dollars(美元), floating point(浮点值)
  • 乘客人数(Number of passengers): integer(整数值)
  • 下车位置(Dropoff location): latitude and longitude(经纬度), floating point(浮点值)
  • 上车位置(Pickup location): latitude and longitude(经纬度), floating point(浮点值)
  • 下车时间(Dropoff time): timestamp(时间戳)
  • 上车时间(Pickup time): timestamp(时间戳)

注意:下面表格中的row ID是指记录的取值范围,不要理解成MySQL等数据库的rowID。
我们导入这些字段,从每个字段创建一个或多个Pilosa字段:

字段(Field)映射(Mapping)
cab_type(出租车类型)直接映射整数枚举值 → row ID
dist_miles(距离)四舍五入round(dist) → row ID
total_amount_dollars(总金额)四舍五入round(dist) → row ID
passenger_count(乘车人数)直接映射整数值 → row ID
drop_grid_id(下车位置网格ID)(lat, lon) → 100x100矩形分割网格 → cell(格子) ID
drop_year年份year(timestamp) → row ID
drop_month月份month(timestamp) → row ID
drop_day该月第几天day(timestamp) → row ID
drop_time(下车时间)该天中的时间映射到48个半小时组成的桶中
pickup_grid_id(下车位置网格ID)(lat, lon) → 100x100矩形分割网格 → cell ID
pickup_yearyear(timestamp) → row ID
pickup_monthmonth(timestamp) → row ID
pickup_dayday(timestamp) → row ID
pickup_time(上车时间)time of day mapped to one of 48 half-hour buckets → row ID

我们还创建了两个附加字段表示持续时间和每一次乘坐的平均速度:

字段(Field)映射(Mapping)
duration_minutes(持续时间)round(drop_timestamp - pickup_timestamp) → row ID
speed_mph()round(dist_miles ÷ (drop_timestamp - pickup_timestamp)) → row ID

映射Mapping

我们要使用的每个列(column)都必须根据某些规则映射到字段(fields)row ID的组合。 有很多方法可以实现这个映射,出租车数据集为我们提供了一个可能性的很好的描述。

0列(colums) --> 1字段(field)

cab_type: 每一行表示一个出租车的类型,在一行数据中有一些bit位来表示一次乘车中使用的出租车类型。 这是一个简单的枚举映射,例如黄色yellow=0,绿色green=1等。这个字段值的位宽(bits)由数据源确定。也就是说,我们有几个数据来源(NYC出租车-黄色, NYC出租车-绿色,Uber汽车),对于每一个数据来源,要设置不同的cab_type常量值。

1列(colums) --> 1字段(field)

以下三个字段以简单的直接方式从原始数据的单个列进行映射。
dist_miles:每一行值(row ID)表示乘车的距离,这个映射关系很简单:例如行值1(row 1)表示乘车距离在[0.5,1.5]这个区间内。也就是说,我们将距离这个浮点值舍入为整数并将其直接用作row ID。通常,从浮点值到row ID的映射可以是任意的。
舍入映射实现简洁,简化了导入和分析。并且,它是人类可读的。 我们会看到这种模式多次使用。

PDK使用中,我们定义了一个Mapper,它是一个只返回整数row ID的函数。 PDK具有许多预定义的映射器,可以使用一些参数进行描述。 其中之一是LinearFloatMapper。在下面代码中它将线性函数应用于输入,并将其转换为整数,因此隐式处理舍入。

lfm := pdk.LinearFloatMapper{
    Min: -0.5,
    Max: 3600.5,
    Res: 3601,
}

MinMax定义线性函数,Res确定输出row ID的最大允许值。我们选择这些值以产生一个舍入到最接近的整数的行为。其他预定义的映射器也有自己的特定参数,通常是两个或三个。

这个映射函数(mapping function)是核心操作,但我们需要一些其他部分来定义整个过程,它封装在BitMapper对象中。此对象定义要使用的输入数据源的哪些字段(Field),如何解析它们(Parsers),要使用的映射(Mapper)以及要使用的字段的名称(Frame)。TODO更新,这是合理的。

pdk.BitMapper{
    Frame:   "dist_miles",
    Mapper:  lfm,
    Parsers: []pdk.Parser{pdk.FloatParser{}},
    Fields:  []int{fields["trip_distance"]},
},

这些相同的对象在JSON定义文件中表示:

{
    "Fields": {
        "Trip_distance": 10
    },
    "Mappers": [
        {
            "Name": "lfm0",
            "Min": -0.5,
            "Max": 3600.5,
            "Res": 3600
        }
    ],
    "BitMappers": [
        {
            "Frame": "dist_miles",
            "Mapper": {
                "Name": "lfm0"
            },
            "Parsers": [
                {"Name": "FloatParser"}
            ],
            "Fields": "Trip_distance"
        }
    ]
}

在这里,我们定义一个Mappers列表,每个Mappers都包含一个名称,我们将在稍后的BitMappers列表中用它来引用mapper。我们也可以使用Parsers进行此操作,但默认情况下可以使用一些不需要配置的简单解析器。 我们还有一个Fields列表,它只是一个字段名称(在源数据中)到列索引(在Pilosa中)的映射。 我们使用的BitMapper定义这些名字让人可读。

total_amount_dollars:在这里,我们再次使用舍入映射,因此每行代表一次行程的总成本,该成本将舍入映射为row ID。 BitMapper定义与前一个定义非常相似。

passenger_count:此列包含小整数,因此我们使用最简单的映射之一:列值是就是row ID。

1列(colums) --> 多字段(multiple field)

使用复合数据类型(如时间戳timestamp)时,有很多映射选项。 在这种情况下,我们希望看到有趣的周期性趋势。因此,我们希望以一种允许我们在分析过程中独立查看它们的方式对时间的循环分量进行编码。

我们通过将时间数据存储在每个时间戳的四个单独字段中来实现此目的:对于年year``月month``日day时间time各一个。 前三个是直接映射的。例如,乘坐的日期2015/06/24将在字段year的第2015行,字段month的第6行和字段day的第24行中设置。

我们可能会在几小时、几分钟和几秒钟内继续使用这种模式,但是我们在这里没有太多使用这种精度,所以我们使用bucketing方法。 也就是说,我们选择一个分辨率(30分钟),将日期划分为该大小的桶,并为每个桶创建一行。 因此,6:45 AM时间在time_of_day字段的第13行中设置了bit位(位图中)。

我们针对每个感兴趣的时间戳执行所有这些操作,一个用于上车时间,一个用于下车时间。 这为我们提供了两个时间戳的总字段:pickup_yearpickup_monthpickup_daypickup_timedrop_yeardrop_monthdrop_daydrop_time

多列(multiple colums) --> 1字段(field)

乘坐数据还包含地理定位数据:上车和下车的纬度和经度。 我们只是希望能够生成乘坐位置的粗略概述热图,因此我们使用网格映射。 我们将感兴趣的区域划分为经纬度空间中的100x100矩形网格,使用单个整数标记此网格中的每个单元格,并使用该整数作为row ID。

我们为每个感兴趣的位置做了所有这些,一个用于上车,一个用于下车。 这为两个位置提供了两个字段:pickup_grid_iddrop_grid_id

同样,位置数据有许多映射选项。 例如,我们可能会转换为不同的坐标系,应用投影或将位置聚合到实际区域(如邻域neighborhoods)。 这里,简单的方法就足够了。

复合映射(Complex mappings)

我们还期望寻找行程持续时间和速度的趋势,因此我们希望在导入过程中捕获此信息。
对于字段duration_minutes我们使用round((drop_timestamp - pickup_timestamp).minutes)计算row ID
对于字段speed_mph我们使用round(dist_miles / (drop_timestamp - pickup_timestamp).minutes)计算row ID`。
这些映射计算很简单,但由于它们需要对多列进行算术运算,因此在PDK中提供的基本映射器中捕获它们有点过于复杂。 相反,我们定义自定义映射器来完成工作:

durm := pdk.CustomMapper{
    Func: func(fields ...interface{}) interface{} {
        start := fields[0].(time.Time)
        end := fields[1].(time.Time)
        return end.Sub(start).Minutes()
    },
    Mapper: lfm,
}

导入处理

设计这个架构和映射后,我们可以使用PDK导入工具读取的JSON定义文件捕获它。
运行pdk taxi会根据此文件中的信息运行导入。 有关更多详细信息,请参阅PDK部分,或查看代码本身。

查询

现在我们可以运行一些示例查询。

可以在一个PQL调用对cab类型进行检索、排序、计数。

TopN(cab_type)
{"results":[[{"id":1,"count":1992943},{"id":0,"count":7057}]]}

可以使用类似的调用检索高流量位置ID。 这些ID对应于经纬度,可以从生成ID的映射中反算。

TopN(pickup_grid_id)
{"results":[[{"id":5060,"count":40620},{"id":4861,"count":38145},{"id":4962,"count":35268},...]]}

每个passenger_count(乘客计数)的total_amount(总金额)的平均值可以通过一些后处理来计算。我们使用少量的TopN调用来检索passenger_count的行程计数, 然后使用这些计数来计算平均值。

queries = ''
pcounts = range(10)
for i in pcounts:
    queries += "TopN(Row(passenger_count=%d), total_amount_dollars)" % i
resp = requests.post(qurl, data=queries)

average_amounts = []
for pcount, topn in zip(pcounts, resp.json()['results']):
    wsum = sum([r['count'] * r['key'] for r in topn])
    count = sum([r['count'] for r in topn])
    average_amounts.append(float(wsum)/count)

注意,BSI-powered Sum查询现在提供了这种查询的替代方法。

有关更多示例和详细信息,请参阅此ipython notebook

 类似资料: