Dong Guo's Blog

我在滴滴遇到的技术挑战

| Comments

加入滴滴打车3个半月,感觉遇到和解决的技术问题超过之前1年的。写在这里给大家分享。

滴滴这边负责所有策略算法设计的是“策略组”,大概有20几个员工。由于滴滴的业务线越来越多(出租车,专车,快车,顺风车拼车,大巴),项目上线时间紧,没有时间对策略算法做最好的设计和优化。于是,新成立了一个通用模型组,目标是抽取出不同业务线的共同点,在一个更高的角度设计更好的策略算法,特别是提供通用的大规模机器学习支持。我是这个team第一个员工。

订单分配:

滴滴一个技术重点是订单分配,全国每天有几百万的乘客通过滴滴叫车出行,有近百万司机接滴滴的订单,如何将订单分配给司机使得更多的人更快地打到车?至少有如下问题需要考虑:

  1. 从大的层面,如何设计一套分配策略,能够保证目标最大?
  2. 从小的层面,分配订单时应该考虑到哪些因素?(距离?是否顺路?司机习惯偏好?天气?供求关系) 这些因素如何组合?
  3. 如何在更长的时间维度上做更优的分配?(比如,当前时刻将乘客A分给司机B是最优的,但几秒之后司机C出现了,司机C离乘客A要近得多)
  4. 拼车更环保也能帮乘客省钱,如何在订单分配中让尽可能多的人在保证体验的同时拼上车?TRB中有非常多的文献
  5. 乘客加价如何影响订单分配?
  6. 我们应该学习Uber的一些策略吗?(比如播单不告诉司机乘客的目的地)

在创业初期,可以用规则快速简单地实现,现在滴滴已经初步有了一套理论上保证收益的分配策略,需要我们进一步去优化效果和效率。

透露一下,在整体策略中,有一个部分是涉及到大规模机器学习:样本是几十亿级别,特征是亿级别(这是我进来的第一个项目)

动态调价:

设想在周五的傍晚,下班高峰,又开始下大雨。在国贸商圈有1000个用户通过滴滴叫车,而附近只有100辆车。如何做订单分配?应该把有限的车给谁?

首先,我们需要定义一个目标:动态调价的目的是什么?最大化成交量?最大化流水?最大化(愿加价)乘客打车的成功率?还是这几个目标的组合最合理?

选定目标之后,每个乘客应该加多少钱?一个优质订单是不是应该少加点?

滴米:

为了促进订单成交,除了给司机补贴和要求乘客加价,是不是还有别的激励方案?

于是滴滴牛逼的PM们推出了滴米这个牛逼的产品。滴米是一种虚拟货币,对于优质订单,一堆司机挤破头来抢,我们就扣他们虚拟货币,对于没人要的订单,我们就奖励滴米。这样就调节了优质劣质订单冰火两重天的不和谐局面。关键是,乘客和滴滴不用花一分钱!

产品很牛逼,策略上如何支持?一个订单发出前,如何确定其是扣滴米还是奖励滴米?扣多少奖多少?每个司机一样吗?整个策略会导致通货膨胀或者紧缩吗?

到达时间预估

预估司机从A点到B点的时间消耗,对滴滴挺重要。如果准确地预估?基于哪些数据和因素?这是一个机器学习问题吗?有更巧妙的预估方法吗?

工作感受

说了来滴滴3各月参与和了解的几个项目,我觉得都非常有意思,也非常有意义。说下来之后的几点体验:

第一,最大的体会是中国互联网行业,特别是滴滴,生机勃勃,有太多有挑战的事情等着做。产品和策略迭代非常快,基本上每天线上的策略设计和架构都会有一次优化上线,你每次改动就会影响每天几百万人的出行体验。

第二,相比我之前的工作,在滴滴工作会和不同岗位的同学紧密合作,每天和靠谱的策略组小伙伴一起做策略设计和讨论;和90后PM mm们讨论进度和策略设计;和QA团队合作测试,保证上线风险可控;和OP同学配合上线;

第三,滴滴的招聘质量提升非常快,3个月前我刚入职,周边同学大概还是百度同学的平均水平,现在我参与的面试,发的offer的质量基本和hulu差不多了。

最后,昨天滴滴大巴上线了,现在可供出行的产品线有出租车,专车,快车,顺风车,大巴。欢迎加入滴滴,在滴滴最美好的阶段,和牛逼的人做牛逼的事情,一起改变中国人的出行体验。有兴趣的联系我: guodong@diditaxi.com.cn

使用Shadowsocks科学上网

| Comments

离开了墙外的Hulu,以后科学上网需要自力更生了。昨天尝试了Shadowsocks,确实稳定易用价格低,推荐给有需求的同学。

Shadowsocks的介绍和安装过程(windows/MacOs/ios/Android)在这篇经典的文章中有详细的介绍:ShadowSocks—科学上网之瑞士军刀。亲测可靠。

Shadowsocks客户端启动之后,是一个三角箭头,需要在“Open Server Preference”里设置账号(包括IP, 端口,加密方式和账号密码)。

Shadowsocks账号设置

Shadowsocks的账号网上有机会找到一些免费的,不过都是和很多人共享,且不稳定(比如密码改变),靠谱的方式是在VPS(虚拟专用服务器)上部署自己的Shadowsocks服务。我用了这家:http://it-player.com/(非广告)的服务,一年几十块,很划算了。(如果你难得需要科学上网一次,你甚至可以使用它的半小时有效的临时账号)

这家的VPS里已经配置好了Shadowsocks服务的安装程序,只需要在网页操作鼠标操作就可以安装好,复制端口号,密码,和VPS的ip配置到本地Shadowsocks客户端。截图如下: 在VPS中安装配置Shadowsocks服务

配置好账号,就可以愉快地切换了,考虑到每个月有上百G的流量,默认一直科学上网好了。下载和youtube的速度都是刚刚的。

在VPS中安装配置Shadowsocks服务

我在第一份工作中学到了什么

| Comments

Hulu是我的第一份工作。从2011年1月开始实习,7月毕业后正式加入,到15年春天,正好是本科4年的长度,在这里对这4年作个总结。

1. 定位自己的技术领域

互联网行业太大了,从技术角度来说至少包含了硬件研发,前端,后端service,基础架构,数据平台,策略算法等领域。大部分人都不能做到在每一个领域精耕细作,可行的方式就是一段时间内聚焦在自己有激情(最好也有基础)的1个领域发力,关注另外1-2个领域的技术发展,同时follow整个行业的大的趋势和变化。

第一份工作的一个收获就是我确定了自己未来几年的技术定位:聚焦在策略算法(包括机器学习,数据挖掘,优化问题,策略设计等)上,关注数据平台和后端service的技术发展,了解整个行业大的趋势和变化。

2. 规划职业发展路线

规划规划就是在问:十年后你想做什么,成为什么样的人?这是比给自己技术领域定位更大更长远的问题。想明白这个问题可以让自己更有目标,在做选择的时候看得更远。

我的技术发展路线目前在大数据这块,从下面的基础架构,数据平台,到上面的策略算法,业务逻辑。大数据现在比较热,但是和每一次技术浪潮一样,这波浪潮也会过去,我的判断是大概5到10年。不是说5-10年后大数据没有用武之地,而是说这块的技术会越来越成熟,越来越工具化。

开源社区和一些相关的技术公司正在以风卷残云的速度推进基础架构和数据平台的稳定性和工具化:Clourdea和Hortonworks提供的工具让Hadoop的安装变得一键傻瓜式;Spark以飞快的速度变成稳定,可以部署在上千台机器在P级别的内存里计算;YARN集群的管理和维护可以通过图形界面方便地操作;Hbase发布了1.0版本,可以搞定T到P级别的数据存储访问;越来越丰富的内存数据库和列存储数据库可供选择。

上层的策略和算法也越来越成熟,特别是有丰富的lib来使用,大部分公司使用工业界经典的算法和经验,加上尝试学术界新的研究成果就可以解决大部分问题了(比如spark的mllib,在最新的1.3版本里实现了大部分经典的机器学习,推荐系统和数据挖掘算法)。我相信几年之内开源社区就会提供好用(相比weka)的工具让你在图形界面里通过鼠标拖拽和简单输入解决大部分ML和DM问题。对于很多优化算法,也有现成的实现,不需要自己去推导实现。

最终大数据这块在未来有挑战性的是业务逻辑,每个公司都有自己相对独特的业务,理清业务,分清主次,平衡商业和技术,利用大数据技术给公司创造最大价值是个人价值最大化的方式。这背后需要的是对大数据领域全面的了解,架构能力,商业思维和团队管理。这也是我目前的职业发展目标。

3. 个人技术发展 vs. 带团队

我身边有不少技术流同事(大多工作2-3年左右)比较排斥带团队,想要100%的精力放在技术钻研上,我非常理解,有一段时间我也这么想。现在我开始思考一个更好的平衡,身边也有同事作出了很好的榜样。

带团队对个人成长的益处是明显的:让自己有更高的视野,培养商业思维,锻炼leadership,更大地发挥自己影响力和价值的机会,在承担更多责任的同时也会获得更多的回报。

带团队一定会占用一定的精力:为团队制订目标,项目规划,团队建设,与团队成员定期沟通,为team负责,做一些没人愿意干的活。解决的方法有2个,第一是delegate工作,比如设立一个PM的角色负责项目规划和对外沟通,将TB的安排分配给某个细心热心的同事;第二个是更努力勤奋,在承担更多的责任带领团队奔向目标的同时还要提高技术的深度和广度,只能更加努力,这是职业生涯必须要经历的阶段。

4. 做一个积极的学习者

视野局限在手头的工作是不够的,跟住技术圈和行业的发展很有必要。下面的一些点有些是我在关注的,有些是需要加强的:

  • 和同事,特别是别的组的同事多多交流,了解公司各个部门和team在做什么,他们关注什么,有什么值得学习和合作的;
  • 积极参加公司内部的技术分享,特别是别的组的,用1个小时了解别人几个月做的事实在是很划算;
  • 订阅阅读,我在用feedly,比较好用,如果你关注大数据,可以订阅“Hadoop Weekly”, “Databricks”以及一些技术公司的技术博客;
  • 关注top conferences(ICML, KDD, AAAI, WSDM…), 90%的文章只需要看下标题,剩下的读读摘要和实验,需要精读或者实现的很少;
  • 关注github trend;
  • 微信的部分公众帐号
  • 重视动手实践,动手去试过,我才会认为自己真的了解;

每天保持阅读,follow技术进展和业界变化目前我做得还不够好,其实做了会发现也就是每天花半个小时,做与不做长期下来差别应该会很大。

5. 时间和效率管理

在Hulu的几年,时间管理上有一些心得,有些点执行得不够好,下一份工作要做到。

  • 每天早上的第一件事情就是做计划(前一天晚上应该更好),精确到半小时。按照优先级排序,每完成一项就标注,比较有成就感。注意尽力让自己不要被打断;
  • 给邮件分类,取消邮件提醒,避免被邮件打断,集中午饭后和晚饭后2个时间段阅读回复邮件;
  • 早上的时间最高效,我很享受很早到办公司,一个人把重要的事情先处理完,这样晚上甚至下午的时间可以用来读文章或者尝试新的东西;

6. 高质量带来高效率

质量体现在工作的每个细节中,在Hulu的几年深有体会,比如说发邮件,你的邮件有语法错误吗?你的邮件组织清晰吗?你的邮件里的每句话对方都关心吗(不关心的就要删除)?有把核心结论放在头部吗?你的观点有充分的数据支持吗?数据支持的图表美观易懂吗?我很感谢对我作出指点的前辈同事。质量还体现在你做的技术分享的质量,开会前的准备,代码的质量和效率,code review时的认真度等等。

高质量和高效率有一些不可调和的地方,这就需要依据事情的优先级来,比如code review需要的细致程度取决于代码的重要性,有没有别的高质量reviewers帮忙。但是很多时候高质量保证了长期的高效率,比如:

  • 如果代码比较烂或者跑得太慢,请务必集中时间立刻彻底改进它,否则这些代码以后会成为你的时间黑洞。

    实例:我曾经有半年的陷入在开发一个项目的某个模块(一个逻辑略复杂的spark应用),3个月开发完之后开始在集群上测试,跑得很慢,内存消耗也很大,但是勉强还能接受。虽然负责集群管理的team有所抱怨,自己一次完整的测试也要花4个小时,但是我还是没有足够重视。后来代码需要引入新的逻辑,由于之前代码质量不高,新逻辑的引入很痛苦,调试的时间也比较长。由于在spark集群中跑得时间太长内存消耗太大,经常会突然挂掉。挣扎了一段时间的小修小补后,终于下定决心梳理逻辑,重构代码,彻底修改了spark程序的并行逻辑,执行时间下降到了半个小时,内存使用大大减少,代码逻辑也简单很多了,这个改变要是在早期就做,肯定能节省很多时间。

  • 在动手进行正式编码开发前,确保对数据做细致的分析,否则可能浪费掉一周的编码时间(不要假设任何来源的数据是没有问题的);

  • 你可以先用某个新语言或者工具,但是有时间了请务必搞透它,否则以后会付出代价;

    实例:一直有一个坏习惯:每次遇到问题去google,而不是把东西研究透。在1年多前开始接触scala,草草地看了一本书,也写了一些比较小的应用,一直没有细致研究它。后来趟了无数坑。

  • 将能自动化的一切自动化(一个典型的例子是机器学习的实验,从数据准备到测试到发实验报告整个过程在自动化后可以节约大量时间,提高了后续实验的效率)

7. 做技术总结和分享

技术总结和分享可以梳理加深自己对知识的理解,纪录自己的成长,同时还是很好的提升自己影响力的机会。如果是通过演讲的方式分享,由于自己理解了和让别人理解不是一个难度,可以进一步加深自己的理解。锻炼自己成为一个好的演讲者(这非常重要)。

技术分享不仅仅是给大家讲调研了什么新的技术,读了什么nb的papers,还包括推动新技术,算法,代码库,工具在team和公司的使用。

8. 成为让别人和公司信赖的人

2013年一帮老朋友离开Hulu,都是和我合作比较多,对我帮助比较大的,第一份工作中遇到这种情况对我的影响还是比较大的。公司做了组织结构的调整,有一些别的组的同事调整到广告组来,在这个特殊阶段,我还算不错地扮演了team核心的角色,帮助公司让这个team稳定并一步步壮大起来。

在努力成为让公司信赖让别人可依赖的人的过程中,我成长了。这边总结下自己做得不够好的地方:1). 从心态上更好地调整好leader的心态; 2). 做判断需要更果断; 3). 更积极地协调大家的工作,特别是实习生的工作,让大家的效率更高; 4). 更好地和PM协调好工作分配

9. 扮演好自己的角色

在工作中,你需要相处的角色有4类,第1类是自己的下属,第2类是组内的PM(programe manager以及product manager),第3类是其他组需要合作的同事,第4类是自己的老板。过去4年有很多心得,也有不少教训。

  • 和自己的下属:需要明确自信地宣称你是老大,你为这个team以及大家的成长和发展负责,定期的沟通,及时指出问题,保证每个人有正确的方向,做的事情符合优先级顺序,有产出,帮忙解决block issue。当然为team制定中长期计划,争取资源和项目也是非常重要的;
  • 和组内的PM:需要一开始就划清职责边界,什么事情由谁负责决定,避免以后工作中出现职责不清或非良性得竞争。要敢于将工作delegate出去,当然要做到放得出去收得回来;
  • 和其他组合作的同事:平时要注意建设好关系,不能需要支援的时候才联系对方。合作的过程中要多从对方的角度思考,对方为什么要合我合作这件事?对方关注什么?对方能得到什么?不能suppose对方有义务积极配合自己;
  • 和自己老大:多沟通,争取主动权和控制权,学会向上manage;

Druid Cluster Setup

| Comments

本文介绍如何搭建Druid cluster,Druid的介绍与应用见另一篇文章

Druid的官网也有详细的文档,建议浏览一遍。本文对关键部分做一些梳理,总结一些比较坑的点。

机器准备

Druid包含若干个services和nodes,我的配置如下(如果没有多个机器,当然可以将所有模块都起在一台机器上)

  • services/nodes on machine1: Mysql server, Zookepper server, coordinator node, overlord node (indexing service)
  • services/nodes on machine2: Historical node, Realtime node
  • services/nodes on machine3: Broker node

3台机器都安装配置好java (how)

安装配置依赖

mysql配置

按照Druid的文档安装mysql并创建一个新的用户druid/diurd。理论上Druid在后续步骤会在database druid中创建3张表druid_config, druid_rules和druid_segments。如果最终你发现没有这3张表,可以手动创建。

安装Zookeeper

安装启动,无坑

Deep storage

如果是local模式(全部都在一台机器上),使用本地磁盘作为deep storage是最简单的,对于cluster,较简单的方式是大家(indexing services, historical node, realtime node)挂载一块公共的磁盘(比如nfs方式),这样historical node就可以同步deep storage上的segments,realtime node也可以将segments同步到deep storage上来。

在实际应用中数据量通常比较大,常常会使用hdfs作为deep storage,为了能够将segments写入到hdfs中,

配置启动Druid各个nodes

对于如下每个node/service,Druid都有一个配置文件runtime.properties(较新的版本将一些公共的配置提取了出来),每个node/service都配置下druid.zk.service.host为zookeeper的地址。

  • coordinator node: 无坑,在machine1上启动
  • historical node: Druid默认Deep storage数据路径为/tmp/druid/localStorage, 可通过配置druid.storage.storageDirectory=XXX来覆盖。

    druid.storage.type=local druid.storage.storageDirectory=/mnt/data/druid/localStorage druid.segmentCache.locations=[{"path": "/mnt/data/druid/indexCache", "maxSize"\: 10000000000}] 如果deep storage是hdfs,则修改druid.storage.type=hdfs,druid.storage.storageDirectory为hdfs上的路径

  • broker node: 无坑,在machine3上启动;

  • indexing service:

    druid.indexer.task.hadoopWorkingPath=hdfs://elsaudnn001.prod.hulu.com/user/guodong/druid druid.storage.type=hdfs druid.storage.storageDirectory=hdfs://elsaudnn001.prod.hulu.com/user/guodong/druid 对应deep storage是hdfs

  • realtime node: 需要使用kafka,参考官网文档即可;

数据导入

使用indexing services是Druid推荐的数据导入方式,数据的input和output都可以是本地/挂载磁盘或者hdfs。 如果要读写hdfs,需要保证druid引用的hadoop版本和你使用的版本一致。

另外Druid引用了2.5.0版本的protobuf,而2.1.0之前版本的hadoop使用的是更老的protobuf版本(如2.4.0a),如果你遇到protobuf版本冲突的问题,需要修改druid的pom.xml重新打包

参考

  1. 官方文档
  2. google groups讨论区

在线广告中的cookie Matching

| Comments

用户定向是在线广告的核心优势之一,数据是用户定向的基础,而cookie matching技术可以将用户在各个站点上的数据关联在一起,使得re-targeting成为可能。

cookie matching有很多的应用场景,典型的有2种,一种是在DMP(Data Management Platform)生态中,另一种是在RTB(Real-time bidding)中。下面介绍下在这2种场景中cookie matching是如何实现的。

  1. 用户U访问jd.com, jd从用户browser中获取jd_cookie_id(jd.com的cookies id);
  2. jd的页面中预先嵌入了BlueKai的js脚本,会有一个302重定向请求转发给BlueKai, 用户的browser中会生成BlueKai的cookies,同时用户的jd_cookie_id会被发送给BlueKai;
  3. BlueKai在其后端service中纪录下BlueKai_cookie_id和jd_cookie_id的映射关系
  4. 用户U某一次去了yahoo.com浏览新闻,假设事先yahoo和jd签了一笔重定向的广告订单
  5. yahoo的ad server在给用户U挑选广告前,访问BlueKai server,BlueKai会在其数据库中检索Bluekai_cookie_id对应了哪些站点的cookie_id
  6. BlueKai给yahoo ad server返回用户U的tags(包含了哪些站点的cookie_id),如果其中包含了jd_cookie_id,则jd的广告可能会播放给该用户看

  1. 用户U访问jd.com, jd用从户browser中获取jd_cookie_id(jd.com的cookies id);
  2. jd的页面预先嵌入了PinYou的脚本,同样的会为BlueKai生成cookie,同时请求Pinyou分配cookie mapping任务;
  3. Pinyou给jd返回一个beacon,其中包含ad exchange地址,和用户U的Pinyou_cookie_id;
  4. jd会通过该beacon向DoubleClick发送cookie matching请求,包含了pinyou_cookie_id;
  5. doubleclick通过302重定向向Pinyou发送doubleclick_cookie_id;
  6. Pinyou在其数据库中存储doubleclick_cookie_id和pinyou_cookie_id的映射关系;
  7. 用户U某一次去yahoo.com浏览新闻,yahoo事先接入了double click广告平台售卖广告;
  8. yahoo的ad server会向double click发送广告请求,double click会将用户U的doubleclick_cookie_id发送给Pinyou等DSP, Pinyou通过cookie matching数据库找到pinyou_cookie_id, 再检查其对应了哪些站点的cookie_id,如果包含了jd_cookie_id,Pinyou就可能会为jd的广告竞争该广告位
  9. double click返回挑中的广告让yahoo播放

Spark使用经验分享

| Comments

Spark是一个基于内存的分布式计算engine,最近1-2年在开源社区(github)和工业界非常火,国内的一些公司也搭建自己的spark集群。典型的应用场景是大数据上的机器学习模型的训练以及各种数据分析。下面是我理解的spark的优势:

  1. Spark使得分布式编程更简单

    Spark将实际分布在众多Nodes上的数据抽象成RDD(resilient distributed dataset),使得我们可以像本地数据一样进行处理。同时,Spark提供了相比MapReduce更丰富的API,相比MapReduce编程更加简单。

  2. Spark通过充分利用内存提高计算效率

    随着数据量越来越大,内存越来越便宜,使用较多的内存让(某些类型的)计算效率提升10至100倍,对很多公司来说是比较划算的。Spark和Facebook的Presto都基于这样的思想。在Spark中,你可以指定将那些在后续需要被多次使用的RDD缓存在内存中,减少了IO的开销,可以显著提高如机器学习模型训练这种需要迭代计算的应用的效率。

  3. Spark提供了一整套的数据分析和计算解决方案,降低了学习和维护成本

    • Spark本身支持做batch的计算,比如每天机器学习模型的训练,各种数据的处;
    • Spark Streaming可以用来做realtime计算和数据处理,Spark Streaming的API和Spark的比较类似,其实背后的实现也是把一段段的realtime数据用batch的方式去处理;
    • MLlib实现了常用的机器学习和推荐算法,可以直接用或者作为baseline;
    • Spark SQL使得可以通过SQL来对Hive表,Json文件等数据源进行查询,查询会被转变为一个Spark job;
    • 还有GraphX, 我没有用过,其用于一些图相关的计算;

  4. Spark可以和MapReduce通过YARN共享机器资源

    所有的存储(HDFS),计算,内存资源都可以共享

个人使用Spark的一些经验总结

  1. 理解spark application的运行原理, 可以避免犯很多错误 Driver中涉及到RDD操作的代码(比如RDD.map{}中的代码)需要Serialize后由Driver所在的Node传输给Executors所在的Nodes,并做Deserialize后在executors上执行,RDD操作中涉及到的数据结构,比如map中用到了一个user_id –> user_profile的hashtable,也需要由Driver所在的Node传输给Executors所在的Nodes。理解了这点就可以更好理解下面2点分享

  2. 保证Rdd操作中的代码都是可序列化的,否则会有NonSerializableException

    一种常见的错误是,在rdd1.map{objectOfClassA.fun}中,对象objectOfClassA所属的类ClassA需要是可序列化的,这也以为ClassA中用到的所有成员属性都是可序列化的。如果classA使用的某个成员属性无法序列化(或者标识为Serializable),scala中可以通过@transient关键字标明序列化ClassA时不序列化该成员变量。推荐stakoverflow的2个讨论:link1 link2

  3. 正确地使用广播变量(broadcast variables)

    如果我们有一份const数据,需要在executors上用到,一个典型的例子是Driver从数据库中load了一份数据dbData,在很多RDD操作中都引用了dbData,这样的话,每次RDD操作,driver node都需要将dbData分发到各个executors node一遍(分享1中已经介绍了背景),这非常的低效,特别是dbData比较大且RDD操作次数较多时。Spark的广播变量使得Driver可以提前只给各个executors node传一遍(spark内部具体的实现可能是driver传给某几个executors,这几个executors再传给其余executors)。使用广播变量有一个我犯过的错误如下:

     val brDbData = sparkContext.broadcast(dbData) //broadcast dbDataA, and name it as brDbData
     val dbDataB = brDbData.value //no longer broadcast variable
     oneRDD.map(x=>{dbDataB.getOrElse(key, -1); …})
    第一行将dbData已经广播出去且命名为brDbData,一定要在RDD操作中直接使用该广播变量,如果提前提取出值,第三行的RDD操作还需要将dbData传送一遍。正确的代码如下
     val brDbData = sparkContext.broadcast(dbData) //broadcast dbDataA, and name it as brDbData
     oneRDD.map(x=>{brDbData.value.getOrElse(key, -1); …})

  4. 使用yarn-client或者yarn-cluster模式运行spark应用之前,在IDE中配置spark local模式调试以及测试好代码

    spark的yanr-client或者yarn-cluster模式做一次测试比较耗时,因为涉及到代码打包以及上传。在IDE(推荐IntelliJ)中配置local模型用于debug和测试,将显著提升开发和测试效率;

    在VM option中配置:”-Dspark.master=local -Dspark.app.name=Test -Xmx2G” (also increase maximal memory for Heap)

  5. 充分利用spark的并行性

    理想的情况是整个代码的逻辑是对一个或几个RDD做处理,这时候spark的并行性往往是充分利用的。有时候代码逻辑会更复杂,比如你需要统计一年中每一天的一些数值,由于代码逻辑比较复杂,一种简单的“偷懒”方式是用一个for循环,在for循环内部做RDD的操作,这种情况是要努力避免的,务必思考将不同date的统计并行化。我写过的两个应用中都遇到了这种情况:优化之后速度提升非常明显。

  6. 使用cache()操作

    cache RDD需要考虑自己有多少内存,对于后续不需要多次使用的RDD不要cache,如果内存有限却又指定要cache,大量的时间将被花在memory和disk的in-out上

  7. 为spark-submit选择合适的参数

    spark-submit用于提交spark job,其可配置为job申请多少资源,包括Driver的内存和cpu,executor的个数,每个executor的内存,cpu和线程数。如果使用yarn做资源管理,只有内存是硬性占有的,一个job过多地申请内存,将会有资源浪费,可能会使别的job因为申请不到足够的内存无法跑。可以用JMX(Java Management Extension)来监控你的spark job到底消耗多少内存,可以指导你申请合适的内存大小。

  8. Spark可以访问众多的数据源:比如HDFS, HBase, Cassandra或者Hive表(Hive on Spark), 直接得到一个RDD用作后续处理

Programmatic Digital Advertising

| Comments

从2010年开始开始工作,一直在做搜索广告,联盟上下文广告以及视频广告。这几年RTB,或者更广义的广告的程序化交易非常火。在这边总结一下我的理解。

“程序化交易“即通过程序自动化地完成商品的买和卖,这在很多领域都有应用,最经典的就是金融市场中的股票,债券,货币,期货等的交易。其至少有2个好处,第一:将人从交易的各个环节中解放出来,让一切变得自动化,有效率;第二:这种程序带来的自动化使得更大范围(甚至全球)资源的优化配置变得有可能。

广告也不例外。相比于传统线下媒体(比如电视)广告,互联网广告(这里我们默认是显示广告,不包括由几家巨头独营的搜索广告市场)有很多优势,最显著的也是2点,第一:用户在互联网的数据可以被用来做广告的精准投放,比如你在浏览很多网站的时候都有可能看到京东的广告,而广告的内容是你之前在京东浏览过的商品;第二:也是由于数据的获取更方便,互联网广告的质和量能更容易被度量,比如广告主可以仅为点击而不是展示付费,且点击次数很容易track。

当然这还不够完美,对于一个广告主,其终极目标是把自己的广告投放到全球最相关的用户上,可能出现在上万个不同的站点上,而自己只需要定义每次展示,点击,或者转化的付费(当然也可以为不同的站点指定不同的单价),广告主或者其代理不需要和各个内容提供商或内容提供商的代理分别扯皮。联盟广告(如百度联盟,阿里妈妈)实现了广告交易的程序化,但仅是其中特殊的一种。

在线广告程序化交易

在线广告程序化交易有如下一些stakeholders: DSP(Demand side platform)可以认为其是一个有技术背景的代理商,广告主可以选择为展示,点击或者转化进行付费,DSP则和其他众多DSP通过竞价竞争广告位。DSP的数据和技术积累直接关系到其是否NB. SSP(Supply side platform)则将publishers进行整合。Ad Network在这个生态里扮演一个中枢的角色。

注意为了实现广告的程序化交易,除了advertisers和publishers, 其他角色都不是必须的,因为广告主和内容提供商之间完全可以直接交易。对于百度联盟,可以认为其同时扮演了Ad Network和DSP的角色,如果你既有内容资源又有广告主资源,你当然不希望和别人分享收益。

可以按照2个纬度(广告投放量是否被保证,以及是固定单价还是需要竞价)将在线广告的程序化交易分为4个类别: reference

Inventory即广告流量,广告主往往比较关心广告投放量是否能够放完,特别是对于大的品牌广告主。保证广告投放量的deal就是传统的Guaranteed deal. 对于广告商来说,其为广告的付费一直是fixed,其可以选择是为每次展示付1毛钱还是为每次点击付10块钱。这个表格中提到的Pricing指的是publisher或者SSP端看到的报价,而这个价格通常指的都是展示的单价(CPM),如果广告主选择以CPM付费(给DSP或者SSP或者直接给publisher),publisher端看到的该由该advertiser声称的报价通常是稳定的,但是如果广告主选择以点击甚至转化付费,DSP对不同的广告位的出价的差别就很可能由很大的差别。

Automated Guaranteed相比传统的广告售卖方式差别不大,主要是让流程变得自动化。注意premium内容的售卖通常是这种方式(比如Hulu以及yahoo首页),这就意味着程序化交易的scope超脱了狭义的RTB, 也适用并有利于premium内容的售卖;

Unreserved Fixed Rate由比较大的实际意义,对于premium的内容,其大部分的Inventory还是会通过传统的方式一笔一笔和大广告主谈(大广告主的deal细节比较复杂),剩余有多少蛋糕可以卖给中小广告主?很难保证,所以通过不保量的方式程序化售卖出去是一个很理想的选择;

Invitation-Only Auction和Open Auction差别不大,只是前者限制只接入部分DSP或者advertisers,2者都是我们常说的RTB。

实时竞价(Real-Time-Bidding)

这边不展开介绍,有2个资源对RTB有很清晰的介绍

Youtube: how an Ad is Served with Real Time Bidding

iPinyou 沈学华的一篇科普

RTB这块对数据的要求还是比较高的。涉及到全互联网的数据共享,用户隐私又是一个棘手的问题。想想央视某年的315报道。

新变化和问题

相比RTB一直给人在垃圾流量做买卖的印象,越来越多高质量的publisher愿意接入到Ad Network中,小广告主也有机会了,程序化交易越来越普及;

质量的控制,包括广告的质量以及内容的质量;标准的统一,比如广告位的尺寸,视频广告的清晰度和风格等; 数据的获取以及隐私;

一些推荐的资源

Interactive Advertising Bureau: 一家致力于建立标准,推动在线广告行业发展的公司 IAB programmatic

TOP 10 thinks you need to know about programmatic

Youtube: how an Ad is Served with Real Time Bidding

iPinyou 沈学华的一篇科普

TubeMogul, 一家典型的DSP公司

Challenges for a Word-class Ad Inventory Forecasting System

| Comments

Key words: Ad Inventory Forecasting, Ad serving, Online advertising.

广告流量预估是各家采取保量模式售卖广告位的公司都必须要做的,无论是传统的电视媒体还是各家互联网公司。 一句话介绍就是给定未来任意一段时间区间(通过在一年内),在任意给定定向条件(比如demographic限定,geographic限定,上下文内容限定,平台限定,时间段限定等)下,预估对于各种形态的广告(比如视频,图片,文本广告)分别有多少可以卖。

实现一个优秀的广告流量预估系统的挑战在什么地方呢?至少包括如下几点

  1. 在大数据量下保证快速的查询响应时间

    • 大数据体现在2点,首先是广告数据条目多,另外是定向条件多。当有100类定向条件,每类可以有2种取值时,不同定向条件的组合数目虽然不会到2100级别,但到billion级别还是可能的。如何做到近于实时的查询?

  2. 复杂多样的干扰因素对预估准确性的影响

    • 业务本身的波动性对广告流量的影响
    • 业务变动对广告流量的影响
    • 突发时间对广告流量的影响

  3. 具体业务逻辑的复杂性增加了系统逻辑的复杂性

    • 典型的业务流程是来了一个广告订单,在系统种查询是否有足够的流量可以售卖,但是查询得到的流量是满足定向条件的总流量,而单个订单的在投放过程种会有各种约束,比如不能给单个用户在一天中重复播放同一个广告商的广告。所以实际能够售卖给该订单的广告量一定少于查询到的总流量。这就需要在预估中考虑广告播放的频率限制;

  4. 和Ad server(广告投放服务器)逻辑的协调

    • 通常同一个广告位会有多个广告qualify,Ad server决定了具体放哪个广告。在ad server逻辑不发生变化的情况下,可以利用历史数据(广告总量在各个定向条件上的分布)进行预估,但是一旦ad server逻辑发生变动,广告流量预告系统最好能实时作出调整,而不是收集了一个月数据之后才发应过来。

  5. 流量预估只是第一步,流量的管理或者说全局的统筹优化是最大化收益的必要。

Druid介绍与实践

| Comments

关键词

Druid, column-stores, distributed system, bitmaps indexing

应用场景

最近在设计一个系统来预估未来一年的广告流量,不是总流量,是任意时间段任何定向(Targeting)条件约束情况下的流量。定向条件有近百种(内容类别,设备平台,用户地域,用户人口属性等),整个时间区间不同组合数(也就是数据行数)是亿级别。目标是秒级的查询响应时间。一个简单的数据例子如下:

存储系统选择

Mysql不是适合的选择

最容易想到的是用Mysql作为数据存放和查询引擎,由于数据行数太多,Mysql必须通过创建索引或者组合索引来加速查询。典型的查询包含若干个定向类别,这些定向条件的组合是非常多的(top 80%的查询也会包含几十种组合),故需要创建非常多的组合索引,代价很高。另外,对于那些没有创建组合索引的查询,查询时间完全不能接受。实际测试结果是加了组合索引后整体查询速度提升有限。

为什么没有用Hbase或者Hive

Hbase本身是一个经典的基于hdfs的分布式存储系统,通常来说其是行存储的,当创建column families之后,每个column family是列存储的(代价就是当通过key查询某行的时候,需要从多个不连续的存储空间读数据,具体可参考)。在这个应用中,可以为每个定向类别(包括日期)创建一个单独的column family,但是据我所知Hbase本身没有为column family创建bitmap indexing(参考),查询速度应该会受到影响。另外不用Hbase的一个原因是我希望存储系统尽量轻量级,最好不要安装hadoop。

Hive将查询转化为M/R任务,没法保证查询的快速响应(比如M/R cluster资源竞争很激烈时),而且使用Hive需要以来hadoop cluster,对这个应用来说也略微重量级。

我们需要一个高可用的分布式的列存储系统

我们的核心需求包含2点,一是查询速度快,二是系统的拓展性好,最好是分布式的。

第一点要求意味着最好用column-store而不是row-store,在这个应用中,虽然定向类别有近百种,但是单次查询通常只会涉及几个。对于修改操作较少且查询往往只涉及少数几列的场景使用column-store可以获得快一个量级的查询速度。而且column-store可以通过bitmap indexing,encoding,以及compression来优化查询速度和存储开销。还存储还是列存储

第二点要求一方面是由于我们的数据量较大,并行存储和查询可以减少时间开销,另一方面是数据量每年还在快速上涨,以后可以简单地通过加机器来应对。

对系统的其他要求比较普遍:系统可用性要高,稳定,轻量级,易于上手。

为什么Druid是适合的选择

Druid满足我们上面2点要求,其是一个开源的、分布式的、列存储系统,特别适用于大数据上的(准)实时分析统计。且具有较好的稳定性(Highly Available)。 其相对比较轻量级,文档非常完善,也比较容易上手。

Druid介绍

如何搭建一个Druid cluster请参考我另一篇文章

概念

Segment: Druid中有个重要的数据单位叫segment,其是Druid通过bitmap indexing从raw data生成的(batch or realtime)。segment保证了查询的速度。可以自己设置每个segment对应的数据粒度,这个应用中广告流量查询的最小粒度是天,所以每天的数据会被创建成一个segment。注意segment是不可修改的,如果需要修改,只能够修改raw data,重新创建segment了。

架构

Druid本身包含5个组成部分:Broker nodes, Historical nodes, Realtime nodes, Coordinator Nodes和indexing services. 分别的作用如下:

  • Broker nodes: 负责响应外部的查询请求,通过查询Zookeeper将请求划分成segments分别转发给Historical和Real-time nodes,最终合并并返回查询结果给外部;
  • Historial nodes: 负责’Historical’ segments的存储和查询。其会从deep storage中load segments,并响应Broder nodes的请求。Historical nodes通常会在本机同步deep storage上的部分segments,所以即使deep storage不可访问了,Historical nodes还是能serve其同步的segments的查询;
  • Real-time nodes: 用于存储和查询热数据,会定期地将数据build成segments移到Historical nodes。一般会使用外部依赖kafka来提高realtime data ingestion的可用性。如果不需要实时ingest数据到cluter中,可以舍弃Real-time nodes,只定时地batch ingestion数据到deep storage;
  • Coordinator nodes: 可以认为是Druid中的master,其通过Zookeeper管理Historical和Real-time nodes,且通过Mysql中的metadata管理Segments
  • Druid中通常还会起一些indexing services用于数据导入,batch data和streaming data都可以通过给indexing services发请求来导入数据。

Druid还包含3个外部依赖

  • Mysql:存储Druid中的各种metadata(里面的数据都是Druid自身创建和插入的),包含3张表:”druid_config”(通常是空的), “druid_rules”(coordinator nodes使用的一些规则信息,比如哪个segment从哪个node去load)和“druid_segments”(存储每个segment的metadata信息);
  • Deep storage: 存储segments,Druid目前已经支持本地磁盘,NFS挂载磁盘,HDFS,S3等。Deep Storage的数据有2个来源,一个是batch Ingestion, 另一个是real-time nodes;
  • ZooKeeper: 被Druid用于管理当前cluster的状态,比如记录哪些segments从Real-time nodes移到了Historical nodes;

查询

Druid的查询是通过给Broker Nodes发送HTTP POST请求(也可以直接给Historical or Realtime Node),具体可见Druid官方文档。查询条件的描述是json文件,查询的response也是json格式。Druid的查询包含如下4种:

  • Time Boundary Queries: 用于查询全部数据的时间跨度
  • groupBy Queries: 是Druid的最典型查询方式,非常类似于Mysql的groupBy查询。query body中几个元素可以这么理解:
    • “aggregation”: 对应mysql”select XX from”部分,即你想查哪些列的聚合结果;
    • “dimensions”: 对应mysql”group by XX”,即你想基于哪些列做聚合;
    • “filter”: 对应mysql”where XX”条件,即过滤条件;
    • “granularity”: 数据聚合的粒度;
  • Timeseries queries: 其统计满足filter条件的”rows”上某几列的聚合结果,相比”groupBy Queries”不指定基于哪几列进行聚合,效率更高;
  • TopN queries: 用于查询某一列上按照某种metric排序的最常见的N个values;

本文小结

  1. Druid是一个开源的,分布式的,列存储的,适用于实时数据分析的系统,文档详细,易于上手;
    • Druid在设计时充分考虑到了Highly Available,各种nodes挂掉都不会使得druid停止工作(但是状态会无法更新);
    • Druid中的各个components之间耦合性低,如果不需要streaming data ingestion完全可以忽略realtime node;
    • Druid的数据单位Segment是不可修改的,我们的做法是生成新的segments替换现有的;
    • Druid使用Bitmap indexing加速column-store的查询速度,使用了一个叫做CONCISE的算法来对bitmap indexing进行压缩,使得生成的segments比原始文本文件小很多;
  2. 在我们的应用场景下(一共10几台机器,数据大概100列,行数是亿级别),平均查询时间<2秒,是同样机器数目的Mysql cluter的1/100 ~ 1/10;
  3. Druid的一些“局限”:
    • Segment的不可修改性简化了Druid的实现,但是如果你有修改数据的需求,必须重新创建segment,而bitmap indexing的过程是比较耗时的;
    • Druid能接受的数据的格式相对简单,比如不能处理嵌套结构的数据

参考资料&推荐阅读

  1. 官方文档
  2. google groups讨论区
  3. Druid: A Real-time Analytical Data Store
  4. Bitmap indexing wikepedia
  5. Bitmap indexing compression algorithm used by Druid
  6. 行存储or列存储?