Dong Guo's Blog

Spark使用经验分享

| Comments

Spark是一个基于内存的分布式计算engine,最近1-2年在开源社区(github)和工业界非常火,国内的一些公司也搭建自己的spark集群。典型的应用场景是大数据上的机器学习模型的训练以及各种数据分析。下面是我理解的spark的优势:

  1. Spark使得分布式编程更简单

    Spark将实际分布在众多Nodes上的数据抽象成RDD(resilient distributed dataset),使得我们可以像本地数据一样进行处理。同时,Spark提供了相比MapReduce更丰富的API,相比MapReduce编程更加简单。

  2. Spark通过充分利用内存提高计算效率

    随着数据量越来越大,内存越来越便宜,使用较多的内存让(某些类型的)计算效率提升10至100倍,对很多公司来说是比较划算的。Spark和Facebook的Presto都基于这样的思想。在Spark中,你可以指定将那些在后续需要被多次使用的RDD缓存在内存中,减少了IO的开销,可以显著提高如机器学习模型训练这种需要迭代计算的应用的效率。

  3. Spark提供了一整套的数据分析和计算解决方案,降低了学习和维护成本

    • Spark本身支持做batch的计算,比如每天机器学习模型的训练,各种数据的处;
    • Spark Streaming可以用来做realtime计算和数据处理,Spark Streaming的API和Spark的比较类似,其实背后的实现也是把一段段的realtime数据用batch的方式去处理;
    • MLlib实现了常用的机器学习和推荐算法,可以直接用或者作为baseline;
    • Spark SQL使得可以通过SQL来对Hive表,Json文件等数据源进行查询,查询会被转变为一个Spark job;
    • 还有GraphX, 我没有用过,其用于一些图相关的计算;

  4. Spark可以和MapReduce通过YARN共享机器资源

    所有的存储(HDFS),计算,内存资源都可以共享

个人使用Spark的一些经验总结

  1. 理解spark application的运行原理, 可以避免犯很多错误 Driver中涉及到RDD操作的代码(比如RDD.map{}中的代码)需要Serialize后由Driver所在的Node传输给Executors所在的Nodes,并做Deserialize后在executors上执行,RDD操作中涉及到的数据结构,比如map中用到了一个user_id –> user_profile的hashtable,也需要由Driver所在的Node传输给Executors所在的Nodes。理解了这点就可以更好理解下面2点分享

  2. 保证Rdd操作中的代码都是可序列化的,否则会有NonSerializableException

    一种常见的错误是,在rdd1.map{objectOfClassA.fun}中,对象objectOfClassA所属的类ClassA需要是可序列化的,这也以为ClassA中用到的所有成员属性都是可序列化的。如果classA使用的某个成员属性无法序列化(或者标识为Serializable),scala中可以通过@transient关键字标明序列化ClassA时不序列化该成员变量。推荐stakoverflow的2个讨论:link1 link2

  3. 正确地使用广播变量(broadcast variables)

    如果我们有一份const数据,需要在executors上用到,一个典型的例子是Driver从数据库中load了一份数据dbData,在很多RDD操作中都引用了dbData,这样的话,每次RDD操作,driver node都需要将dbData分发到各个executors node一遍(分享1中已经介绍了背景),这非常的低效,特别是dbData比较大且RDD操作次数较多时。Spark的广播变量使得Driver可以提前只给各个executors node传一遍(spark内部具体的实现可能是driver传给某几个executors,这几个executors再传给其余executors)。使用广播变量有一个我犯过的错误如下:

     val brDbData = sparkContext.broadcast(dbData) //broadcast dbDataA, and name it as brDbData
     val dbDataB = brDbData.value //no longer broadcast variable
     oneRDD.map(x=>{dbDataB.getOrElse(key, -1); …})
    第一行将dbData已经广播出去且命名为brDbData,一定要在RDD操作中直接使用该广播变量,如果提前提取出值,第三行的RDD操作还需要将dbData传送一遍。正确的代码如下
     val brDbData = sparkContext.broadcast(dbData) //broadcast dbDataA, and name it as brDbData
     oneRDD.map(x=>{brDbData.value.getOrElse(key, -1); …})

  4. 使用yarn-client或者yarn-cluster模式运行spark应用之前,在IDE中配置spark local模式调试以及测试好代码

    spark的yanr-client或者yarn-cluster模式做一次测试比较耗时,因为涉及到代码打包以及上传。在IDE(推荐IntelliJ)中配置local模型用于debug和测试,将显著提升开发和测试效率;

    在VM option中配置:”-Dspark.master=local -Dspark.app.name=Test -Xmx2G” (also increase maximal memory for Heap)

  5. 充分利用spark的并行性

    理想的情况是整个代码的逻辑是对一个或几个RDD做处理,这时候spark的并行性往往是充分利用的。有时候代码逻辑会更复杂,比如你需要统计一年中每一天的一些数值,由于代码逻辑比较复杂,一种简单的“偷懒”方式是用一个for循环,在for循环内部做RDD的操作,这种情况是要努力避免的,务必思考将不同date的统计并行化。我写过的两个应用中都遇到了这种情况:优化之后速度提升非常明显。

  6. 使用cache()操作

    cache RDD需要考虑自己有多少内存,对于后续不需要多次使用的RDD不要cache,如果内存有限却又指定要cache,大量的时间将被花在memory和disk的in-out上

  7. 为spark-submit选择合适的参数

    spark-submit用于提交spark job,其可配置为job申请多少资源,包括Driver的内存和cpu,executor的个数,每个executor的内存,cpu和线程数。如果使用yarn做资源管理,只有内存是硬性占有的,一个job过多地申请内存,将会有资源浪费,可能会使别的job因为申请不到足够的内存无法跑。可以用JMX(Java Management Extension)来监控你的spark job到底消耗多少内存,可以指导你申请合适的内存大小。

  8. Spark可以访问众多的数据源:比如HDFS, HBase, Cassandra或者Hive表(Hive on Spark), 直接得到一个RDD用作后续处理

Programmatic Digital Advertising

| Comments

从2010年开始开始工作,一直在做搜索广告,联盟上下文广告以及视频广告。这几年RTB,或者更广义的广告的程序化交易非常火。在这边总结一下我的理解。

“程序化交易“即通过程序自动化地完成商品的买和卖,这在很多领域都有应用,最经典的就是金融市场中的股票,债券,货币,期货等的交易。其至少有2个好处,第一:将人从交易的各个环节中解放出来,让一切变得自动化,有效率;第二:这种程序带来的自动化使得更大范围(甚至全球)资源的优化配置变得有可能。

广告也不例外。相比于传统线下媒体(比如电视)广告,互联网广告(这里我们默认是显示广告,不包括由几家巨头独营的搜索广告市场)有很多优势,最显著的也是2点,第一:用户在互联网的数据可以被用来做广告的精准投放,比如你在浏览很多网站的时候都有可能看到京东的广告,而广告的内容是你之前在京东浏览过的商品;第二:也是由于数据的获取更方便,互联网广告的质和量能更容易被度量,比如广告主可以仅为点击而不是展示付费,且点击次数很容易track。

当然这还不够完美,对于一个广告主,其终极目标是把自己的广告投放到全球最相关的用户上,可能出现在上万个不同的站点上,而自己只需要定义每次展示,点击,或者转化的付费(当然也可以为不同的站点指定不同的单价),广告主或者其代理不需要和各个内容提供商或内容提供商的代理分别扯皮。联盟广告(如百度联盟,阿里妈妈)实现了广告交易的程序化,但仅是其中特殊的一种。

在线广告程序化交易

在线广告程序化交易有如下一些stakeholders: DSP(Demand side platform)可以认为其是一个有技术背景的代理商,广告主可以选择为展示,点击或者转化进行付费,DSP则和其他众多DSP通过竞价竞争广告位。DSP的数据和技术积累直接关系到其是否NB. SSP(Supply side platform)则将publishers进行整合。Ad Network在这个生态里扮演一个中枢的角色。

注意为了实现广告的程序化交易,除了advertisers和publishers, 其他角色都不是必须的,因为广告主和内容提供商之间完全可以直接交易。对于百度联盟,可以认为其同时扮演了Ad Network和DSP的角色,如果你既有内容资源又有广告主资源,你当然不希望和别人分享收益。

可以按照2个纬度(广告投放量是否被保证,以及是固定单价还是需要竞价)将在线广告的程序化交易分为4个类别: reference

Inventory即广告流量,广告主往往比较关心广告投放量是否能够放完,特别是对于大的品牌广告主。保证广告投放量的deal就是传统的Guaranteed deal. 对于广告商来说,其为广告的付费一直是fixed,其可以选择是为每次展示付1毛钱还是为每次点击付10块钱。这个表格中提到的Pricing指的是publisher或者SSP端看到的报价,而这个价格通常指的都是展示的单价(CPM),如果广告主选择以CPM付费(给DSP或者SSP或者直接给publisher),publisher端看到的该由该advertiser声称的报价通常是稳定的,但是如果广告主选择以点击甚至转化付费,DSP对不同的广告位的出价的差别就很可能由很大的差别。

Automated Guaranteed相比传统的广告售卖方式差别不大,主要是让流程变得自动化。注意premium内容的售卖通常是这种方式(比如Hulu以及yahoo首页),这就意味着程序化交易的scope超脱了狭义的RTB, 也适用并有利于premium内容的售卖;

Unreserved Fixed Rate由比较大的实际意义,对于premium的内容,其大部分的Inventory还是会通过传统的方式一笔一笔和大广告主谈(大广告主的deal细节比较复杂),剩余有多少蛋糕可以卖给中小广告主?很难保证,所以通过不保量的方式程序化售卖出去是一个很理想的选择;

Invitation-Only Auction和Open Auction差别不大,只是前者限制只接入部分DSP或者advertisers,2者都是我们常说的RTB。

实时竞价(Real-Time-Bidding)

这边不展开介绍,有2个资源对RTB有很清晰的介绍

Youtube: how an Ad is Served with Real Time Bidding

iPinyou 沈学华的一篇科普

RTB这块对数据的要求还是比较高的。涉及到全互联网的数据共享,用户隐私又是一个棘手的问题。想想央视某年的315报道。

新变化和问题

相比RTB一直给人在垃圾流量做买卖的印象,越来越多高质量的publisher愿意接入到Ad Network中,小广告主也有机会了,程序化交易越来越普及;

质量的控制,包括广告的质量以及内容的质量;标准的统一,比如广告位的尺寸,视频广告的清晰度和风格等; 数据的获取以及隐私;

一些推荐的资源

Interactive Advertising Bureau: 一家致力于建立标准,推动在线广告行业发展的公司 IAB programmatic

TOP 10 thinks you need to know about programmatic

Youtube: how an Ad is Served with Real Time Bidding

iPinyou 沈学华的一篇科普

TubeMogul, 一家典型的DSP公司

Challenges for a Word-class Ad Inventory Forecasting System

| Comments

Key words: Ad Inventory Forecasting, Ad serving, Online advertising.

广告流量预估是各家采取保量模式售卖广告位的公司都必须要做的,无论是传统的电视媒体还是各家互联网公司。 一句话介绍就是给定未来任意一段时间区间(通过在一年内),在任意给定定向条件(比如demographic限定,geographic限定,上下文内容限定,平台限定,时间段限定等)下,预估对于各种形态的广告(比如视频,图片,文本广告)分别有多少可以卖。

实现一个优秀的广告流量预估系统的挑战在什么地方呢?至少包括如下几点

  1. 在大数据量下保证快速的查询响应时间

    • 大数据体现在2点,首先是广告数据条目多,另外是定向条件多。当有100类定向条件,每类可以有2种取值时,不同定向条件的组合数目虽然不会到2100级别,但到billion级别还是可能的。如何做到近于实时的查询?

  2. 复杂多样的干扰因素对预估准确性的影响

    • 业务本身的波动性对广告流量的影响
    • 业务变动对广告流量的影响
    • 突发时间对广告流量的影响

  3. 具体业务逻辑的复杂性增加了系统逻辑的复杂性

    • 典型的业务流程是来了一个广告订单,在系统种查询是否有足够的流量可以售卖,但是查询得到的流量是满足定向条件的总流量,而单个订单的在投放过程种会有各种约束,比如不能给单个用户在一天中重复播放同一个广告商的广告。所以实际能够售卖给该订单的广告量一定少于查询到的总流量。这就需要在预估中考虑广告播放的频率限制;

  4. 和Ad server(广告投放服务器)逻辑的协调

    • 通常同一个广告位会有多个广告qualify,Ad server决定了具体放哪个广告。在ad server逻辑不发生变化的情况下,可以利用历史数据(广告总量在各个定向条件上的分布)进行预估,但是一旦ad server逻辑发生变动,广告流量预告系统最好能实时作出调整,而不是收集了一个月数据之后才发应过来。

  5. 流量预估只是第一步,流量的管理或者说全局的统筹优化是最大化收益的必要。

Druid介绍与实践

| Comments

关键词

Druid, column-stores, distributed system, bitmaps indexing

应用场景

最近在设计一个系统来预估未来一年的广告流量,不是总流量,是任意时间段任何定向(Targeting)条件约束情况下的流量。定向条件有近百种(内容类别,设备平台,用户地域,用户人口属性等),整个时间区间不同组合数(也就是数据行数)是亿级别。目标是秒级的查询响应时间。一个简单的数据例子如下:

存储系统选择

Mysql不是适合的选择

最容易想到的是用Mysql作为数据存放和查询引擎,由于数据行数太多,Mysql必须通过创建索引或者组合索引来加速查询。典型的查询包含若干个定向类别,这些定向条件的组合是非常多的(top 80%的查询也会包含几十种组合),故需要创建非常多的组合索引,代价很高。另外,对于那些没有创建组合索引的查询,查询时间完全不能接受。实际测试结果是加了组合索引后整体查询速度提升有限。

为什么没有用Hbase或者Hive

Hbase本身是一个经典的基于hdfs的分布式存储系统,通常来说其是行存储的,当创建column families之后,每个column family是列存储的(代价就是当通过key查询某行的时候,需要从多个不连续的存储空间读数据,具体可参考)。在这个应用中,可以为每个定向类别(包括日期)创建一个单独的column family,但是据我所知Hbase本身没有为column family创建bitmap indexing(参考),查询速度应该会受到影响。另外不用Hbase的一个原因是我希望存储系统尽量轻量级,最好不要安装hadoop。

Hive将查询转化为M/R任务,没法保证查询的快速响应(比如M/R cluster资源竞争很激烈时),而且使用Hive需要以来hadoop cluster,对这个应用来说也略微重量级。

我们需要一个高可用的分布式的列存储系统

我们的核心需求包含2点,一是查询速度快,二是系统的拓展性好,最好是分布式的。

第一点要求意味着最好用column-store而不是row-store,在这个应用中,虽然定向类别有近百种,但是单次查询通常只会涉及几个。对于修改操作较少且查询往往只涉及少数几列的场景使用column-store可以获得快一个量级的查询速度。而且column-store可以通过bitmap indexing,encoding,以及compression来优化查询速度和存储开销。还存储还是列存储

第二点要求一方面是由于我们的数据量较大,并行存储和查询可以减少时间开销,另一方面是数据量每年还在快速上涨,以后可以简单地通过加机器来应对。

对系统的其他要求比较普遍:系统可用性要高,稳定,轻量级,易于上手。

为什么Druid是适合的选择

Druid满足我们上面2点要求,其是一个开源的、分布式的、列存储系统,特别适用于大数据上的(准)实时分析统计。且具有较好的稳定性(Highly Available)。 其相对比较轻量级,文档非常完善,也比较容易上手。

Druid介绍

如何搭建一个Druid cluster请参考我另一篇文章

概念

Segment: Druid中有个重要的数据单位叫segment,其是Druid通过bitmap indexing从raw data生成的(batch or realtime)。segment保证了查询的速度。可以自己设置每个segment对应的数据粒度,这个应用中广告流量查询的最小粒度是天,所以每天的数据会被创建成一个segment。注意segment是不可修改的,如果需要修改,只能够修改raw data,重新创建segment了。

架构

Druid本身包含5个组成部分:Broker nodes, Historical nodes, Realtime nodes, Coordinator Nodes和indexing services. 分别的作用如下:

  • Broker nodes: 负责响应外部的查询请求,通过查询Zookeeper将请求划分成segments分别转发给Historical和Real-time nodes,最终合并并返回查询结果给外部;
  • Historial nodes: 负责’Historical’ segments的存储和查询。其会从deep storage中load segments,并响应Broder nodes的请求。Historical nodes通常会在本机同步deep storage上的部分segments,所以即使deep storage不可访问了,Historical nodes还是能serve其同步的segments的查询;
  • Real-time nodes: 用于存储和查询热数据,会定期地将数据build成segments移到Historical nodes。一般会使用外部依赖kafka来提高realtime data ingestion的可用性。如果不需要实时ingest数据到cluter中,可以舍弃Real-time nodes,只定时地batch ingestion数据到deep storage;
  • Coordinator nodes: 可以认为是Druid中的master,其通过Zookeeper管理Historical和Real-time nodes,且通过Mysql中的metadata管理Segments
  • Druid中通常还会起一些indexing services用于数据导入,batch data和streaming data都可以通过给indexing services发请求来导入数据。

Druid还包含3个外部依赖

  • Mysql:存储Druid中的各种metadata(里面的数据都是Druid自身创建和插入的),包含3张表:”druid_config”(通常是空的), “druid_rules”(coordinator nodes使用的一些规则信息,比如哪个segment从哪个node去load)和“druid_segments”(存储每个segment的metadata信息);
  • Deep storage: 存储segments,Druid目前已经支持本地磁盘,NFS挂载磁盘,HDFS,S3等。Deep Storage的数据有2个来源,一个是batch Ingestion, 另一个是real-time nodes;
  • ZooKeeper: 被Druid用于管理当前cluster的状态,比如记录哪些segments从Real-time nodes移到了Historical nodes;

查询

Druid的查询是通过给Broker Nodes发送HTTP POST请求(也可以直接给Historical or Realtime Node),具体可见Druid官方文档。查询条件的描述是json文件,查询的response也是json格式。Druid的查询包含如下4种:

  • Time Boundary Queries: 用于查询全部数据的时间跨度
  • groupBy Queries: 是Druid的最典型查询方式,非常类似于Mysql的groupBy查询。query body中几个元素可以这么理解:
    • “aggregation”: 对应mysql”select XX from”部分,即你想查哪些列的聚合结果;
    • “dimensions”: 对应mysql”group by XX”,即你想基于哪些列做聚合;
    • “filter”: 对应mysql”where XX”条件,即过滤条件;
    • “granularity”: 数据聚合的粒度;
  • Timeseries queries: 其统计满足filter条件的”rows”上某几列的聚合结果,相比”groupBy Queries”不指定基于哪几列进行聚合,效率更高;
  • TopN queries: 用于查询某一列上按照某种metric排序的最常见的N个values;

本文小结

  1. Druid是一个开源的,分布式的,列存储的,适用于实时数据分析的系统,文档详细,易于上手;
    • Druid在设计时充分考虑到了Highly Available,各种nodes挂掉都不会使得druid停止工作(但是状态会无法更新);
    • Druid中的各个components之间耦合性低,如果不需要streaming data ingestion完全可以忽略realtime node;
    • Druid的数据单位Segment是不可修改的,我们的做法是生成新的segments替换现有的;
    • Druid使用Bitmap indexing加速column-store的查询速度,使用了一个叫做CONCISE的算法来对bitmap indexing进行压缩,使得生成的segments比原始文本文件小很多;
  2. 在我们的应用场景下(一共10几台机器,数据大概100列,行数是亿级别),平均查询时间<2秒,是同样机器数目的Mysql cluter的1/100 ~ 1/10;
  3. Druid的一些“局限”:
    • Segment的不可修改性简化了Druid的实现,但是如果你有修改数据的需求,必须重新创建segment,而bitmap indexing的过程是比较耗时的;
    • Druid能接受的数据的格式相对简单,比如不能处理嵌套结构的数据

参考资料&推荐阅读

  1. 官方文档
  2. google groups讨论区
  3. Druid: A Real-time Analytical Data Store
  4. Bitmap indexing wikepedia
  5. Bitmap indexing compression algorithm used by Druid
  6. 行存储or列存储?

Expectation Propagation: Theory and Application

| Comments

简介

第一次接触EP是10年在百度实习时,当时组里面正有计划把线上的CTR预估模型改成支持增量更新的版本,读到了微软一篇基于baysian的CTR预估模型的文章(见推荐阅读5),文章中没有给出推导的细节,自己也没有继续研究。今年在PRML中读Approximal inference这章对EP有了一些了解,同时参考了其它相关的一些资料,在这里和大家探讨。

什么是期望传播

期望传播(Expectation Propagation): 基于bayesian的一种近似推断方法,常用于图模型中计算单个节点的边缘分布或者后验分布,属于message passing这一类推断方法。

牛人

首先当然是Thomas Minka, 其在MIT读博期间提出了EP,并将EP作为博士论文课题在2001年发表。Minka毕业之后去了CMU教书,现在和Bishop一起在剑桥微软研究院。

其次是Kevin p. Murphy, 他是我做EP相关文献调研时发现的paper比较多的,我读到的一篇全文基本都是在推导Minka博士论文中一些公式的细节。btw Murphy 2013年出版了一本书,见推荐阅读2。

中英文对照

下面是一些关键词的中英文对应 (由于相关的书籍文献基本都是英文的,有些词没有想到比较好的中文翻译,故保留英文)

截断高斯: Truncated Gaussian

置信传播: Belief Propagation (后面会简称BP)

期望传播: Expectation Propagation (后面会简称为EP)

消息传递: Message passing

背景

EP本身的思想和方法都还是比较简单的,不过会涉及到一些背景知识,这边一并介绍。

高斯、截断高斯

EP的核心思想之一是用指数族分布近似复杂分布,实际应用中通常选择高斯分布,所以多个高斯分布的乘积,相除,积分在EP应用过程中不可避免。

截断高斯是高斯分布在指定区间归一化后的结果,(所以其并不是一个高斯分布),EP本身并不和截断高斯直接相关,但是如果在分类问题中应用EP,对观察样本(0-1)建模方法通常是y=sign(f(x)>t), 和另一个高斯分布相乘之后即为截断高斯分布。(然后就需要计算其的均值方差,原因后面会提到)

我在另一篇文章Gaussian and Truncated Gaussian中介绍了比较多的细节,可以参考。

指数族分布

指数族分布(exponential family distribution)有着非常好的特性,比如其有充分统计量,多个指数族分布的乘积依然是指数族分布,具体的介绍可以参见wikipedia, 介绍的非常全面,也可以参考PRML第2章。

由于指数族的良好特性,其常被拿去近似复杂的概率分布(EP和variance baysian都是)。由于EP中常常选择高斯分布,我们这边强调一下,高斯分布的充分统计量为: (x, x2), 其中x为高斯分布的自变量。

图模型

EP是贝叶斯流派的计算变量后验分布(或者说是边缘分布)的近似推断方法,通常都可以通过一个概率图模型来描述问题的生成过程(generation process),所以可以说图模型是EP的典型应用场景。

图模型在很多地方都有介绍,比如PRML第8章,在这里就不重复了。有1点提一下,一个图模型的联合分布(不管是有向图还是无向图)可以写成若干个因子的乘积,对于有向图每个因子是每个节点的条件分布(条件于其的所有直接相连的父节点),对于无限图每个因子是energy function。 这个特性在后面的置信传播算法会用到。

factor graph

图模型中节点之间的关系通过边来表达,factor graph将这种节点之间的关系通过显式的节点(factor node)来表达,比如对于有向图,每个factor node就代表一个条件概率分布,图中的所有的信息都存在于节点上(variable nodes和factor nodes)。

后面的BP和EP都基于factor graph,可以认为factor graph使得图上的inference方法变得比较直观,另一个好处是factor graph屏蔽了有向图和无向图的差异。(有向图无向图都可以转变为factor graph)

更多了解可以看PRML第8章。

置信传播

Belief Propagation (BP)又叫’sum-product’,是一种计算图模型上节点边缘分布的推断方法,属于消息传递方法的一种,非近似方法(基于其延伸的Loopy Belief propagation为近似推断方法)。 BP的核心为如下3点:

  • 单个variable node边缘分布的计算

(注:上图来之PRML)

前面提到过图模型的联合分布可以分解为若干因子的乘积,每个因子对应一个factor node:

每个variable node的边缘分布为与其直接相连的factor nodes传递过来的message的乘积:

  • 从factor node到variable node的消息传递

(注:上图来之PRML)

从factor node f传递到variable node x的message为:与f直接相连(除了x)的variable nodes传递到f的messages与f本身的乘积的积分(积分变量为与f直接相连的除x之外的所有variable nodes):

  • 从variable node到factor node的消息传递

(注:上图来之PRML)

从variable node x到factor node f的message为:与x直接相连的factor nodes(除f以外)传递到x的messages的乘积:

更多细节请参考PRML

Moment matching

在实际的问题中,要么后验分布本身比较复杂(推荐阅读3中的Clutter example),要么最大化后验的计算比较复杂,要么破坏了具体算法的假设(比如EP要求图中的所有message都是指数族),所以常常会用(有良好性质的)指数族分布近似实际的概率分布。

用一个分布去近似另一个分布的常见方法是最小化KL散度:

我们发现通过最小化KL散度得到的‘最接近’p(x)的q(x)可以简单地通过匹配充分统计量的期望得到。

当q(x)为高斯分布的时候,我们知道其充分统计量u(x)=(x, x2),这时通过KL散度最小化近似分布近似的方法称为moment matching(匹配矩)

为什么称为匹配矩呢,看看矩的定义就知道了:

期望传播方法-理论

EP的思想:在图模型中,用高斯分布近似每一个factors,然后’approximate each factor in turn in the context of all remaining facotrs’.

下面为具体的算法: (注:本算法参考了PRML)

下面通过Minka博士论文中的例子‘clutter problem’来解释:每个观察样本以(1-w)的概率由高斯分布N(x|sita, I)生成,以w的概率由noise生成(同样也是高斯分布N(x|0, aI)),于是:

按照EP的思想,我们用一个单高斯q(sita)去近似混合高斯p(x|sita)

单高斯去近似混合高斯听起来效果一定不好,但实际上,由于EP在近似的时候乘了其他所有factors的高斯近似之后的上下文,考虑到很多个高斯分布相乘之后的方差一般都很小,所有实际上单高斯只需要在很小的区间近似好混合高斯即可。如下图:

(注:上面2张图来之PRML)

其中蓝色曲线为混合高斯(没有画完整),红色曲线为近似的单高斯,绿色曲线为‘其它所有factor的乘积’。

EP怎么应用在message passing中:

在图模型中,所谓的’context of all remaining factors’就是当前节点之外所有节点和messages,所以EP在图模型中的使用方式为:和BP一样的方法计算message和marginal distribution,当某个factor或者marginal distribution不是高斯分布时,用高斯分布近似它。所以Minka认为EP也就是BP+moment matching。

由于每个factor以及variable node的边缘分布都是高斯分布(或被近似为高斯分布),所以EP的计算过程一般并不复杂。

期望传播方法-应用

EP被广泛地应用在图模型的inference上,这边提一下微软的2个应用:Bing的CTR预估,XBOX游戏中player skill的评估。

Bing的CTR预估

详细的推导及实验请参考:Bayesian CTR prediction for Bing paper中称这个model为ad predictor,其在我的数据集上预估效果很不错,训练预测速度快,天然支持增量更新,主要的缺点就是模型不是稀疏的。如果你知道怎么自然地达到稀疏效果,请指教。

和其它算法的比较请参考:Classification Models

XBOX中player skill的评估

图模型和上一篇略有差异,推导过程差不多,paper中没有给出详细的推导过程,不过Murphy的新书中给出了,请参考推荐阅读2。

一些小结

  1. EP的通用性比较好,对于实际的问题,画出graph model和factor graph,就可以尝试用EP来进行inference;
  2. 虽然应用EP时的推导过程略长(计算很多个message和marginal distribution),但是最终的整体的更新公式一般都非常简单,所以模型训练时间开销往往较小;
  3. 为了使用EP,只能用高斯分布来建模,比如Bing的CTR预估那篇对每个feature的weight建模,只能假设服从高斯分布,相当于是2范数的正则化,不能达到稀疏模型的效果;
  4. 在我的实验中,通过EP进行inference得到的模型预估效果不错,值得一试;

推荐阅读

  1. 机器学习保留书籍:Pattern recognition and machine learning 第2,8,10章 (第2章看看高斯四则运算,指数族分布特性;第8章了解图模型基础,期望传播算法;第10章了解期望传播算法)

  2. Murphy新书: Machine Learning: A Probabilistic Perspective 第22章 (本书相比PRML更加具体,第22章干脆包含了TrueSkill的详细推导步骤)

  3. Minka的博士论文:A family of algorithms for approximate Bayesian inference (想了解基本思想和理论看完前3节即可)

  4. EP的应用之一:TrueSkill: A Bayesian Skill Rating System (文中并没有给出EP每一步的细节)

  5. EP的应用之二:Web-Scale Bayesian Click-Through Rate Prediction for Sponsored Search Advertising in Microsoft’s Bing Search Engine (CTR预估的应用比较吸引人,文章写得很棒,算法的效果也很好,只是干脆忽略的inference过程,有兴趣的同学可以参看我另一个文章,里面有一步一步推导的过程)

  6. Minka整理的EP学习资料:link (其中的包含了一个videolecture上他做的variance inference的talk值得一看)

  7. 本文的PPT: 墙外, 墙内

Classification Models

| Comments

During my past 3 years in career, following classifiers are often used for classification tasks.

Typcial classifiers comparision

Decision Tree

Decision Tree is not a start-of-art model for classification or regression, and when there are huge features(say millions) it will take a long time for training. But it may perform very well when the number of distinct features are limited, and the classification/regression task is obviously non-linear.

A typical scenario is multi-model fusion: you have trained multiple models for single task, and you want to generate the final prediction result using all these models. Based on my past experiments, Decision Tree can out perform linear model(linear regression, logistic regression and so on) on many datasets.

RDT, random forest, boosting tree

All of these 3 models are ensemble learning method for classification/regression that operate by constructing multiple Decision Tree at training time. For RDT(random decision tree), only part of total samples are used to training each tree. And all features are considered for splitting.

Similar with RDT, random forest also use part of total sampels to construct each tree, but it also only use subset of features/dimisions for splitting. So random forest introduces more ‘random’ factors for training, and it may perform better when there are more noises in training set.

boosting tree is actually forward stagwise additive modeling with decision tree as base learner. And if you choose exponential loss function, then boosting tree becauses Adaboost with decision tree as base learner. Here is one slide about additive model and boosting tree.

Generalized linear model

One of the most popular generalized linear model is logistic regression, which is generalized linear model with inversed sigmoid function as the link function. There are multiple different implementation for logistic regression, and here are some often used by me.

Logistic regression optimized with SGD.

It’s very basic, so I ignore the details here

OWLQN

It was proposed by Microsoft in paper Orthant-Wise Limited-memory Quasi-Newton Optimizer for L1-regularized Objectives of ICML 2007. You can also find the source code and executable runner via this link.

This model is optimized by a method which is similar with L-BFGS, but can achieve sparse model with L1 regularizer. I recommend you try this model and compare with other models you are using in your dataset. Here are four reasons:

  1. It’s fast, especially when the dataset is huge;
  2. It can generate start-of-art prediction results on most dataset;
  3. It’s stable and there are few parameters need to be tried. Actaully, I find only regularization parameters can impact the performance obviously;
  4. It’s sparse, which is very important for big dataset and real product. (Of course, sparse is due to L1 regularizer, instead of the specific optimization method)

One problem is it’s more challenge to implement it by yourself, so you need spend some time to make it support incremental update or online learning.

FTRL

It was proposed by Google via paper Ad Click Prediction: a View from the Trenches in 2013. I tried on my dataset, and this implementation can generate similar prediction performance with OWLQN. It’s quicker than OWLQN for training, and it’s also sparse. One advantage is it’s very easy to implement, and it support increamental update naturally. One pain point for me is this model has 3-4 parameters need to be chosen, and most of them impact the prediction performance obviously.

Ad predictor

This paper was also proposed by Microsoft in ICML 2009.

One biggest different with upper 3 implementation is it’s based on bayesian, so it’s generative model. Ad predictor is used to predict CTR of sponsor search ads of Bing, and on my dataset, it could also achieve comparable prediction performance with OWQLN and FTRL. Ad predictor model the weight of each feature with a gaussian distribution, so it natually supports online learning. And the prediction result for each sample is also a gaussian distribution, and it could be used to handle the exploration and exploitation problem. See more details of this model in another post.

Neural Network

ANN is so slow for training, so it’s tried only when the dataset is small of medium. Another disadvantage of ANN is it’s totally blackbox.

SVM

SVM with kernel is also slow for training. You can try it with libsvm.

Gaussian and Truncated Gaussian

| Comments

Everybody knows about Gaussian distribution, and Gaussian is very popular in Bayesian world and even in our life. This article summaries typical operation of Gaussian, and something about Truncated Guassian distribution.

Gaussian

pdf and cdf

Sum/substraction of two independent Gaussian random variables

Please take care upper formula only works when x1 and x2 are independent. And it’s easy to get the distribution for variable x=x1-x2 See here for the detils of inference

Product of two Gaussian pdf

Please take care x is no longer a gaussian distribution. And you can find it’s very elegant to use ‘precision’ and ‘precision adjusted mean’ for Gaussian operation like multiply and division. See here for the detils of inference

Division of two Gaussian pdf

Intergral of the product of two gaussian distribution

Truncated Gaussian

Truncated Gaussian distribution is very simple: it’s just one conditional (Gaussian) distribution. Suppose variable x belongs to Gaussian distribution, then x conditional on x belongs to (a, b) has a truncated Gaussian distribution.

Calculate expectation of Truncated Gaussian

Calculate variance of Truncated Gaussian

Bayesian CTR Prediction of Bing

| Comments

Microsoft published a paper in ICML 2009 named ‘Web-Scale Bayesian Click-Through Rate Prediction for Sponsored Search Advertising in Microsoft’s Bing Search Engine’, which is claimed won the competition of most accurate and scalable CTR predictor across MS. This article shows how to inference this model(let’s call it Ad predictor) step-by-step.

Pros. and Cons.

I like it because it’s totally based on Bayesian, and Bayesian is beautiful. Online learning is naturally supported, and the precition accuracy is comparable with FTRL and OWLQN. And both training and prediction is light-weight and fast. Btw: one shortage of this model is it’s not sparse, which may be a big issue when applied on big dataset with huge amount of features.

Inference using Expectation Propagation step by step

Firstly, following is the factor graph of ad predictor.

For each sample, we can use the formula of step 13 to update the posterior parameter of W, which is very easy to be implemented.

Prediction

After training, we can predict with following formula:

Prediction Accuracy

I compared it with FTRL and OWLQN on one dataset for age&gender prediction. AUC of this model is comparable with OWLQN and FTRL, so I recommend you have a try in your case.

Insights

  1. You can find variance of each feature increases after every exposure, which makes sense.

  2. This model shows samples with more features will have bigger variance, which does not make sense very much. I think the reason is we assume all the features are independent. Any insights from you?

Notes for Distributed System Theory

| Comments

过去三年参与的广告相关的项目都基于各种各样的分布式存储和计算系统,比如hdfs, hbase, cassandra cluster, memcached cluster, Druid, hadoop和spark。最近在研究各个系统的原理,周末浏览了一本电子书《分布式系统原理介绍》,介绍了很多重要的基础知识,推荐浏览。

Key words:

数据存储方式,consistent hashing,数据副本,副本控制协议,Lease机制,Quorum机制,日志技术,Paxos协议,CAP理论

consistent hashing: 分布式数据存取的经典方案

  1. 背景:数据的分布式存储的一种简单方式为hash, 这种方法简单,无需纪录数据存放在哪台node上。但是当集群需要拓展(或者减少)机器时,由于hash结果一般和机器数目有关,数据需要重新迁移;
  2. Consistent hashing将key组织成一个环,每个node负责一段连续的子环,当增加一个node时,只需要将临近的一个node上的部分数据copy到新node,不停机的情况下,对hit rate的影响明显减小;
  3. 需要额外存储的元数据只有node在环上的顺序,数据量小;
  4. 有一个缺点是:每次加入一个node,只能减轻现有的一个node的压力。且如果是随即分配node在环上的顺序,将很难保证在每个node的’负载均衡’;
  5. 一个较好的解决方案是引入’virtual nodes’: 首先假设有比真实nodes个数明显多的virtual nodes。这个个数是固定的,所以可以预先将其均匀地分布到环上。通过元数据将每个real node关联上多个virtual nodes,注意不是连续的,一般选在环上分隔较远的virtual nodes。这样的话,每加入一个real node,将会将属于其他real nodes的几个virtual nodes分配给新加入的real node,分担了多个real nodes的压力。反之,当一个real node失败,可以有多个real nodes来分担压力。

数据副本: 分布式系统提供高容错高用可行的重要手段

  1. 有2种最常见的数据副本存储方案,一种是以机器为粒度,一种是以数据块为粒度。
  2. 以机器为粒度的缺点:
    • 一旦某台机器数据丢失,数据恢复的效率不高,因为一般需要从非常有限台机器copy数据。而且会比较消耗copy from机器的资源,往往需要让一台机器下线,或者限制copy的速度;
    • 一旦某台机器宕机,压力被有限的几台副本机器分担(若3台机器互为副本,则剩余机器的数据访问压力提高50%)
    • 增加一台机器无法扩容,必须一下增加若干台机器(互为副本)
  3. 以数据块为粒度的好处:(相对应)
    • 一旦某台机器数据丢失,可以从剩余的所有机器上copy数据,数据恢复的效率高
    • 一台机器宕机,不会给任何单台机器增加明显压力;
    • 扩容容易

副本控制协议: 控制各副本数据读写行为,使得副本满足一定可用性和一致性

  1. 中心化副本控制协议:中心结点负责数据的更新,并发控制,协调副本一致性;单点故障
    • primary node在将更新同步到各个secondary nodes时,限于primary node的压力,往往只同步给有限几个secondary nodes,后续采用接力的方式
    • 同步过程的中间状态,包括同步失败的处理,以及access状态的返回,决定了系统的数据一致性
    • primary node宕机由于需要时间来发现(比如10s),在这段时间内无法更新数据
  2. 去中心化副本控制协议
  3. 实例 (大部分分布式数据存储系统都使用primary-secondary副本控制协议)
    • GFS: primary-secondary
    • PUNTS(yahoo!的分布式数据存储平台): 使用primary-secondary协议
    • Dynamo/Cassandra: 使用去中心化副本控制协议
    • Zookeeper: 使用去中心化协议选出primary node,之后就转变为中心化的副本控制协议

Lease机制:保证secondary nodes和primary node的一致性

  1. primary node在向cache nodes同步数据时,还会附带一个时间戳表达这份数据的有效期。在有效期内primary node保证不修改数据,cache nodes可以放心使用数据。
  2. 带来的cost是:若某个cache node提高修改元数据请求,primary node需要阻塞所有cache nodes对该份数据的读写请求,并等待到该份数据的lease超时才修改元数据。
  3. 所以lease的时长比较关键:太长会导致availability下降,太短会导致cache nodes频繁同步primary node;常使用10s
  4. lease机制用于primary node的选择:primary node的更改主要由于结点宕机,而传统的Heartbeat的方法不能有效监控结点状态(存在网络失败,监控机器本身性能问题导致的延时等),故每当nodes给监控机器发heartbeat时,返回一个lease,若primary node心跳失败,则等待lease过期后,监控结点更换primary node
  5. lease机制应用的实例
    • GFS中用于master挑选primary node
    • Chubby(google的分布式文件系统) 有2处使用了lease机制 a). secondary nodes承诺在一段时间内不重新选举primary node; b). primary node用于监控secondary nodes的状态
    • Zookeeper: primary node向client颁发lease

Quorum机制: 可用性和一致性的权衡

  1. 在存在N个副本的情况下,对于更新操作,只要在W在副本上更新成功,则算该更新成功(“成功提交的更新操作”)。当读取R个副本时(限制R+W>N),就可以保证可以读到更新之后的数据。
  2. 注意:仅依赖quorum机制无法知道最新成功提交的版本号,故无法保证强一致性(系统应该始终返回最新的成功提交的数据),需要通过其他方式获取系统最新成功提交的数据;
  3. Quorum机制体现了CAP理论中的availability(update时只需要更新了W个副本,read时只需要读取R个副本)和consistent(由于update或者read只需要在部分副本上成功即可,导致了仅follow Quorum机制不能保证强一致性)的权衡: