MongoDB-M101-Aggregation

MongoDB-M101-CRUD 中介绍了MongoDB的查询方式都是比较简单的不能用来做复杂的查询和运算。MongoDB提供类似SQL这样查询数据集合的工具——aggregation框架。aggregation框架的工作原理类似pipeline,也就是把当前流程的输出当作下一个流程的输入,这个过程不断进行下去,最后得到最终结果。Binary, Symbol, MinKey, MaxKey, DBRef, Code, and CodeWScope这些类型是不能使用pipeline的。SQL与Aggregation框架的对照表如下:

SQL Terms, Functions, and Concepts MongoDB Aggregation Operators Input:Output
where $match N : 1
group by $group N : 1
having $match N : 1
select $project 1 : 1
order by $sort 1 : 1
limit $limit No
sum() $sum No
count() $sum No
join No direct corresponding operator; however, the $unwind operator allows for somewhat similar functionality, but with fields embedded within the document. No

例子

使用MongoDB提供的zips.jsonposts.json数据来进行测试。zips的数据结构如下:

> db.zips.findOne()
{
    "city" : "ACMAR",
    "loc" : [
        -86.51557,
        33.584132
    ],
    "pop" : 6055,
    "state" : "AL",
    "_id" : "35004"
}

$match

$match主要是用于过滤,语法与find()中使用过滤类似。
可以是单个条件做过滤,比如只显示city是ACMAR的记录

db.zips.aggregate([{$match: {city: "ACMAR"}}])

也可以是多个条件做过滤,比如显示state是AL且pop大于40000的记录

db.zips.aggregate([{$match:{state: "AL", pop: {$gt: 40000}}}])

aggregation框架查出来的结果包括两部分,一部分是结果”result”数组,一部分是校验查询是否成功”ok”。

> db.zips.aggregate([{$match: {city: "ACMAR"}}])
{
    "result" : [
        {
            "city" : "ACMAR",
            "loc" : [
                -86.51557,
                33.584132
            ],
            "pop" : 6055,
            "state" : "AL",
            "_id" : "35004"
        }
    ],
    "ok" : 1
}

$group

$group主要是用于分组统计的。$group的第一个字段一定要是”_id”,如果不是的话就会直接报错。
实现select count(*) as count from zips的方式如下

> db.zips.aggregate([{$group:{"_id": null,count: {"$sum": 1}}}])
{ "result" : [ { "_id" : null, "count" : 29467 } ], "ok" : 1 }
> db.zips.find().count()
29467

实现select count(pop) as count from zips group by state的方式如下,需要统计的字段需要”$pop”这样的写法,前面是美元符号,然后用双引号括住。

db.zips.aggregate([{$group:{"_id": "$state",count: {"$sum": "$pop"}}}])

实现select count(pop) as from zips where state = "AL" group by state

select count(pop) as from zips where group by state having state = "AL"可以用$group和$match组合使用。

这条SQL语句最组要的区别是先过滤还是先分组的问题,先过滤再分组的话就先执行$match部分,然后用$match输入的结果进行$group,也就是pipeline;反之就是先$group再$match。由于$group是在内存中计算的,所以比较好的实现是先过滤出需要的数据,再对过滤出来的数据进行分组。
由于aggregate处理数据的方式是pipeline,所以$group执行完之后输出的结果只包括“_id”和“count”字段,也就是后面跟着要运行的$match得到的输入数据也是只有“_id”和“count”字段,因此如果$match中使用”state”或者”$state”都不会得到正确的数据,具体可以看下面的代码实现。

> db.zips.aggregate([{$match:{state:"AL"}}, {$group:{"_id": "$state",count: {"$sum": "$pop"}}}])
{ "result" : [ { "_id" : "AL", "count" : 4040587 } ], "ok" : 1 }
> db.zips.aggregate([{$group:{"_id": "$state",count: {"$sum": "$pop"}}},{$match:{"_id":"AL"}}])
{ "result" : [ { "_id" : "AL", "count" : 4040587 } ], "ok" : 1 }
> db.zips.aggregate([{$group:{"_id": "$state",count: {"$sum": "$pop"}}},{$match:{"state":"AL"}}])
{ "result" : [ ], "ok" : 1 }
> db.zips.aggregate([{$group:{"_id": "$state",count: {"$sum": "$pop"}}},{$match:{"$state":"AL"}}])
{ "result" : [ ], "ok" : 1 }
$addToSet

$addToSet是如果一个值在数组中不存在,就把这个值添加到数组。用于获取不存在相同元素的数组。

> db.posts.aggregate([{$group:{"_id": null,titles:{$addToSet:"$title"}}}])
{
    "result" : [
        {
            "_id" : null,
            "titles" : [
                "Declaration of Independence",
                "Gettysburg Address",
                "Bill of Rights",
                "US Constitution"
            ]
        }
    ],
    "ok" : 1
}
$push

$push与$addToSet很相似,但是$push不会判断值是否在数组中存在,所有值无论是否重复都会添加到数组中。

db.posts.aggregate([{$group:{"_id": null,titles:{$push:"$title"}}}])
$sum $avg

$sum是用于汇总,$avg是用于求平均数的。

db.zips.aggregate([{$group:{"_id": "$state",count: {"$sum": "$pop"}}}])
db.zips.aggregate([{$group:{"_id": "$state",count: {"$avg": "$pop"}}}])
$max $min

$max是求最大值,$min是求最小值。

db.zips.aggregate([{$group:{"_id": "$state", maxpop:{$max:"$pop"}}}])
db.zips.aggregate([{$group:{"_id": "$state", minpop:{$min:"$pop"}}}])
$first $last

$first是取第一条记录,$last是取最后一条记录。
下面的实例是先根据state和city分组,计算出population,然后根据state和population排序,最后根据state分组,获取每个组里面populatin第一或者最后的一个city。

db.zips.aggregate([{$group: {_id: {state:"$state", city:"$city"},population: {$sum:"$pop"}}}, {$sort:{"_id.state":1,"population":-1}}, {$group: {_id:"$_id.state", city:{$first:"$_id.city"}}}])
db.zips.aggregate([{$group: {_id: {state:"$state", city:"$city"},population: {$sum:"$pop"}}}, {$sort:{"_id.state":1,"population":-1}}, {$group: {_id:"$_id.state", city:{$last:"$_id.city"}}}])

$sort

$sort主要是对结果集进行排序。
实现select count(pop) as count from zips group by state order by count的方式如下

db.zips.aggregate([{$group:{"_id": "$state",count: {"$sum": "$pop"}}},{$sort: {count:1}}])

$sort、$skip、$limit同时用在aggregate中执行的顺序是那个写在前面就先执行那个,与find执行的时候是先sort,然后skip,最后才是limit这样的顺序完全不同。

> db.zips.aggregate([{$group:{"_id": "$state",count: {"$sum": "$pop"}}},{$sort: {count:1}}, {$skip:50}])
{ "result" : [ { "_id" : "CA", "count" : 29760021 } ], "ok" : 1 }
> db.zips.aggregate([{$group:{"_id": "$state",count: {"$sum": "$pop"}}},{$skip:50},{$sort: {count:1}}])
{ "result" : [ { "_id" : "ME", "count" : 1227928 } ], "ok" : 1 }

$skip

$skip简单来说就是抛弃前面的N个结果,常用在分页中。

> db.zips.aggregate([{$skip:29465}])
{
    "result" : [
        {
            "city" : "SMOOT",
            "loc" : [
                -110.922351,
                42.619238
            ],
            "pop" : 414,
            "state" : "WY",
            "_id" : "83126"
        },
        {
            "city" : "THAYNE",
            "loc" : [
                -111.011354,
                42.933026
            ],
            "pop" : 505,
            "state" : "WY",
            "_id" : "83127"
        }
    ],
    "ok" : 1
}

$limit

$limit是用来限制输出的结果数的。

> db.zips.aggregate([{$limit:1}])
{
    "result" : [
        {
            "city" : "ACMAR",
            "loc" : [
                -86.51557,
                33.584132
            ],
            "pop" : 6055,
            "state" : "AL",
            "_id" : "35004"
        }
    ],
    "ok" : 1
}

$project

$project 的作用是选择、构建需要的结果输出,包括重命名、添加字段、删除字段,这都不会修改数据库原有的内容的。
$project默认的输出字段是包括”_id”,可以用0、1控制是否显示。

> db.zips.aggregate([{$project: {city:1}}, {$limit:1}])
{ "result" : [ { "city" : "ACMAR", "_id" : "35004" } ], "ok" : 1 }
> db.zips.aggregate([{$project: {_id:0,city:1}}, {$limit:1}])
{ "result" : [ { "city" : "ACMAR" } ], "ok" : 1 }

重命名

> db.zips.aggregate([{$project: {_id:0,城市:"$city"}}, {$limit:1}])
{ "result" : [ { "城市" : "ACMAR" } ], "ok" : 1 }

添加字段

> db.zips.aggregate([{$project: {_id:1,where:{city:"$city",state:"$state"}}}, {$limit:1}])
{
    "result" : [
        {
            "_id" : "35004",
            "where" : {
                "city" : "ACMAR",
                "state" : "AL"
            }
        }
    ],
    "ok" : 1
}

$unwind

$unwind是用来处理数组类型字段的。假如一条记录里面的数组字段有10个值,可以使用$unwind把这条记录拆成10条。

> db.posts.aggregate({$project:{tags:1}},{$limit:1})
{
    "result" : [
        {
            "_id" : ObjectId("50ab0f8bbcf1bfe2536dc3f8"),
            "tags" : [
                "trade",
                "fowl",
                "forecast",
                "pest",
                "professor",
                "willow",
                "rise",
                "brace",
                "ink",
                "road"
            ]
        }
    ],
    "ok" : 1
}
> db.posts.aggregate({$project:{tags:1}},{$limit:1},{$unwind:"$tags"})
{
    "result" : [
        {
            "_id" : ObjectId("50ab0f8bbcf1bfe2536dc3f8"),
            "tags" : "trade"
        },
        {
            "_id" : ObjectId("50ab0f8bbcf1bfe2536dc3f8"),
            "tags" : "fowl"
        },
        {
            "_id" : ObjectId("50ab0f8bbcf1bfe2536dc3f8"),
            "tags" : "forecast"
        },
        {
            "_id" : ObjectId("50ab0f8bbcf1bfe2536dc3f8"),
            "tags" : "pest"
        },
        {
            "_id" : ObjectId("50ab0f8bbcf1bfe2536dc3f8"),
            "tags" : "professor"
        },
        {
            "_id" : ObjectId("50ab0f8bbcf1bfe2536dc3f8"),
            "tags" : "willow"
        },
        {
            "_id" : ObjectId("50ab0f8bbcf1bfe2536dc3f8"),
            "tags" : "rise"
        },
        {
            "_id" : ObjectId("50ab0f8bbcf1bfe2536dc3f8"),
            "tags" : "brace"
        },
        {
            "_id" : ObjectId("50ab0f8bbcf1bfe2536dc3f8"),
            "tags" : "ink"
        },
        {
            "_id" : ObjectId("50ab0f8bbcf1bfe2536dc3f8"),
            "tags" : "road"
        }
    ],
    "ok" : 1
}
> 

在blog系统中要实现统计博文的tag出现的次数可以使用$unwind实现。下面的例子只取出现次数最多的3个tag。

> db.posts.aggregate({$project:{tags:1}},{$unwind:"$tags"},{$group: {_id:"$tags",count:{$sum:1}}},{$sort:{count:-1}},{$limit:3})
{
    "result" : [
        {
            "_id" : "sphynx",
            "count" : 13
        },
        {
            "_id" : "lunchroom",
            "count" : 12
        },
        {
            "_id" : "puppy",
            "count" : 11
        }
    ],
    "ok" : 1
}

对于统计tag出现的次数也可以使用MapReduce,下面是我用Go语言写的示例。连接MongoDB用的是mgo。Aggregate可以在一定程度上实现需要MapReduce的任务。

type Result struct {
    Key   string "_id"
    Value int
}

func GetTags() (result []Result) {
    job := &mgo.MapReduce{
        Map: "function() { " +
            "    this.Tags.forEach( " +
            "        function(z){emit(z,1);})}",
        Reduce: "function(key, values) { " +
            "    var total=0; " +
            "    for(var i=0;i<values.length;i++){ " +
            "        total += values[i];} " +
            "    return total;}",
    }
    conn, err := mgo.Dial(HOST)
    defer conn.Close()
    db := conn.DB(DATABASE)
    articles := db.C("articles")
    _, err = articles.Find(nil).MapReduce(job, &result)
    if err != nil {
        panic(err)
    }

    return result
}