Yelp的数据管道开源了:ETL已死,实时流技术永生? | 数盟社区

作者:足下 来源:InfoQ

这一系列文章原发表于Yelp的技术博客,经Yelp授权,由InfoQ中文站社区编辑足下翻译并分享。

在过去的几个月里,Yelp一直在不断地向大家分享着他们的数据管道的情况。这是一套基于Python的工具,可以实时地将数据流向需要它们的服务,并随之加以必要的数据处理。这一系列的文章讲述了如何从MySQL中将消息复制出来、如何跟踪模式的变化及演进、并最终分享了如何把数据接入到Redshift和Salesforce等不同类型的目标存储。

幸好Yelp非常慷慨,他们不只是分享了自己的设计思路和经验,更是赶在圣诞节之前向大家献上了一份大礼,把主要模块开源出来了!

在读过了所有这些关于我们的数据管道的文章之后,可能你会觉得我们这些Yelp的人不过是像一个孩子在向大家炫耀他的新玩具一样,肯定会自己捂得严严的,不会和大家分享。但是和大多数有了新玩具又不会分享的孩子一样,我们愿意分享——所以我们最终决定要把我们的数据管道的主体部分开源出来,然后大家就可以开心地迎来新年假期了。

闲话少说,下面这些就是Yelp为大家的假期准备的圣诞礼物:

MySQL Streamer会不断地查看MySQL的binlog,得到最新的表变更操作。Streamer负责捕获MySQL数据库上的单条数据更改操作,并把它封装成Kafka消息,发布到Kafka的某个Topic中。如果有必要,也会做模式转换。

Schematizer服务会跟踪每一条消息所使用的模式信息。在有新模式出现时,Schematizer会处理注册消息,并为下游的表生成更改计划。

Data Pipeline clientlib为生产和消费Kafka消息提供了非常易用的接口。有了clientlib,就再也不必关心Kafka的Topic名字、加密或消费者程序的分区情况了。你可以站在表和数据库的角度去考虑问题,不必关心其它细节。

Data Pipeline Avro utility包提供了读写Avro模式的Python接口。它也可以为表的主键等模式信息提供枚举类,这一点Yelp在实践中发现非常有用。

Yelp Kafka库扩展了Kafka-python包,并提供了多重处理消费者组等新功能。这个库可以帮助大家非常高效地与Kafka进行交互。这个库也让用户可以判断出Yelp内部的Kafka跨区域部署情况。

数据管道中不同组成部分的概览图。单个服务用方形表示,而共享包用圆角表示。

这些项目每个都有Docker化的服务,你可以很容易地把它们用到你的架构中。我们希望对于每个用Python构建实时流处理程序的开发者来说,它们都能有用。

有了之前的文章介绍,现在又有了开源的代码,相信有许多数据处理工程师的圣诞假期都会过得无比充实了。

Yelp的系列文章深度讲解了他们如何用“确保只有一次”的方式把MySQL数据库中的改动实时地以流的方式传输出去,他们如何自动跟踪表模式变化、如何处理和转换流,以及最终如何把这些数据存储到Redshift或Salesforce之类的数据仓库中去。

这一系列共六篇。

第一篇:每天处理几十亿条消息:Yelp的实时数据管道

第二篇:Yelp的实时流技术:利用MySQLStreamer将数据库变更发送给Kafka

第三篇:从MySQL到Kafka,如何管理每天实时发布的几十亿条消息?

第四篇:实时计算和数据转换,为何Yelp弃用Storm和Heron,自建流处理器PaaStorm?

小树分割线

第五篇:数据管道之Salesforce Connector

Yelp用客户关系管理(customer relationship management,CRM)平台Salesforce来为超过2000人的销售团队提供支持。Salesforce提供了许多现成的功能,让我们的销售团队可以轻松地定制他们的业务流程。

我们的销售团队主要做什么呢?他们卖广告包!他们卖给谁呢?Yelp上的商家!那我们如何从Yelp的数据库中获取这些商家信息并提供给Salesforce呢?读过下文你就明白了。

以前的方法

我们现有的单向同步基础架构名为“Bulk Workers”,是早在2010年设计的了,目的是要显著地改进端到端发送数据的时间。这套设计方案成功地把同步时间从3星期缩短为24小时,这很棒!这套方案细节是怎样的呢?

这些“Bulk Workers”实际上就是一些Gearman定时任务,它们会检索业务表中的每一行数据,再按Salesforce的模式对这些数据进行转换。转换后的数据会用一个Salesforce客户端发送给Salesforce,客户端是基于Beatbox开发的,主要的改进是增加了对Salesforce批量API的支持。

旧的集成方法

上面的方法一直工作得很好……直到它再也顶不住的时候。

在接下来的5年里,这套架构见证了数据的急速增长,从最初的30万亿行,到2015年超过了100万亿行。数据更新的操作影响尤其大。后来,同步时间开始越拖越长。我们就知道我们需要优化一下系统,来更好地支持更新操作了。

进入正题:数据管道

于是我们开始收集需求。我们认为新的解决方案需要下面这些:

实时处理

保证“至少一次提交”

自带监控和告警等功能

由配置驱动模式之间的转换

可以很容易地增加新字段和转换

差不多是在相同的时间点,我们已经在做依靠分布式发布/订阅消息系统Kafka的新数据管道了。这个数据管道可以直接满足我们的前三个需求。那我们剩下的工作就是构建一个转换框架来满足后面的两个需求,并且作为到Salesforce.com的连接器。

Salesforce数据管道集成方法

转换器(Transformer)

我们采用了一个名为PaaStorm的、在Yelp Hackathon上产生的并且已经应用到生产环境的项目作为我们的Kafka-to-Kafka处理器,叫这个名字是因为它与Storm很相似,并且是用Yelp PaaSTA部署的。

在保留了Storm的范式的前提下,我们构建了一个通用的转换器,可以生成许多实例,处理各个Topic中的要发往Salesforce的原始数据。在处理源Topic时,每个实例都会从一个YAML文件中得到转换步骤,然后再做拷贝、移动和(或)值映射操作。这很重要,因为Salesforce的模式存在的时间远远长于我们的新架构,是很难改动的。这个也意味着没有什么比较自动的方法去做字段映射。如果有一个由配置驱动的映射关系,那就让我们可以快速对转换做解析而不必真正地部署代码。这对项目的灵活性至关重要。

每个转换器都会向一个新的Kafka Topic中发布序列化后的Salesforce对象,供上传器消费并发往Salesforce。

上传器(Uploader)

让上传器自己做为一个实例就可以让我们清楚地知道我们有多少个服务要与Salesforce.com通信。上传器会消费各个转换器转换后的消息,将它们批量发送到Salesforce。因为发往Salesforce的请求是发向互联网的,所以这是我们的管道中最慢的部分之一。因而能不有效地进行批量处理就对性能影响非常大。使用合适的API也非常重要。因为Salesforce提供了好多种不同的API,所以有时候选择用哪个也不容易。

为了让我们可以很容易地不必做更多的工作就在API之间切换,我们写了个通用的客户端,用它包装了现有的SOAP、REST和批量API等Python客户端。我们也写了一个ORM(Object-Relational Mapping)客户端,并为每张我们要写的目标表定义了模型。这让我们可以在发送到Salesforce.com之前就对数据进行验证,并且在写数据的时候确认该用哪个Salesforce External ID。

评估

我们用来评估的第一张表就是我们的广告商表。选它是因为它是我们Yelp整体业务中规模比较小的表之一,但对于我们的销售团队的运作又是非常重要的。在之前要把数据变更操作从Yelp同步到Salesforce需要16小时。在切换到新架构之后,可以看到同步时间变成了平均约10秒钟,偶尔会有一些波峰,但也就只是变为几分钟而已!这就足以表明出现在Salesforce上的数据是非常可靠的,销售团队在查看数据时,再也不必不断地来要更新的数据了。

每秒钟平均同步延迟

挑战

听起来很容易?事实上并非如此。在设计和构建我们的连接器的过程中,我们要解决好几个问题:

我们注意到在处理失败的更新操作中有很大部分都是在Salesforce一侧超时了,或者是由于没能成功的为某行数据获取锁而被拒绝了。这两种问题的根本原因都在于我们在Salesforce的程序中使用了大量的触发器和回滚操作。差不多每张表上都有非常复杂的逻辑,而每一条写操作都要把这些逻辑全处理一遍,以保证不同数据之间的一致性,或者为了将某些业务流程自动化。

这些功能本来都是很好的,但碰上问题时你就不那么想了。所以我们工作的重点就是要减少做写操作时的处理量。把这样的处理尽可能地挪到异步处理的过程中,就可以减少我们锁定单条记录的时间,也就减少了每条写操作的处理时间。

另一个要解决的问题是依赖关系。我们本来的数据源(MySQL)有限制依赖,而Kafka并没有。虽然写到每个Kafka Topic中的消息都是保证有序的,但是我们并不能保证这些Topic中的数据会以某个确定的速度被处理。在各张表都彼此依赖的情况下这个问题就很严重,因为一张表中的数据可能会比另一张表的数据更先被读取和更新,导致数据在一定时间内处于不一致的状态。

一个常见的例子就是广告商的数据记录会比用户的数据稍早到一会。因为广告商的数据中包含一个指向User表的外键字段,写入就会失败。因此我们就要跟踪哪些数据是因为不符合依赖约束而写入失败的,然后再由上传器在确定依赖关系满足了之后再重试。把上传的操作按依赖顺序序列化并处理重试,这可以覆盖我们绝大多数的用例,尽管这意味着我们因此没办法达到一个很高的并行度。

还有一个问题,就是我们的数据没有全放在一个单一的数据库里面,所以对我们来说可用的就是单条的数据记录。为了解决这样的问题我们开发了新功能,读两个Topic中的数据并把它们关联起来,然后再把关联后的数据重新发布出去。

结论

使用基于Kafka的数据管道来为销售团队获取数据,我们已经在这方面取得了很大改进。接下来我们准备构建自己的基础架构,这样就可以实现其他的转换操作、简单的聚合、以及在写Salesforce的高可靠保障等等功能。

小树分割线

第六篇:近实时地将Kafka中的数据流入Redshift

Yelp的数据管道给了开发者一套工具集,来轻松地把数据在公司内搬来搬去。目前为止我们标出了核心数据管道基础架构的三个主要部分。第一个是MySQLStreamer,它把MySQL上的操作复制出来并以流的方式发布到基于模式的Kafka Topic中。第二个是Schematizer,在这里集中化地保存了各个Kafka Topic中的真实信息。它以持久化存储的方式保存了在某个特定的Topic中用于编码数据的Avro模式信息、数据的属主、各个字段的文档等。最后,是流处理器PaaStorm,它让我们可以更容易地消费数据管道中的数据,做转换、再发布回数据管道。把这些工具一起使用,就可以得到各自关心的数据了。

我们这篇文章关注的焦点是数据管道图的“目标”框。在其中可以看到,在转换后的数据被各后续服务消费之前,需要保存到目标数据库中。

在把尽可能多的数据传输和转换操作抽象到数据管道基础架构中之后,我们仍然要把Kafka Topic中的数据与服务使用的最终数据库关联起来。每一种数据库都有它自己的特性,所以都需要各自不同的连接器。

Yelp用来做数据分析处理的工具中有一个非常重要而且应用广泛的就是Redshift。它因为具有列存储、易扩展、适用于在几十亿行规模的大表间进行复杂的联合查询操作等特性,让计算和聚合操作都变得非常容易。这就让Redshift成了一个分析师、数据科学家和工程师们都要使用的非常棒的数据仓库,要经常交互式地进行各种复杂查询、获得输出。本文主要关注Redshift连接器:一个使用PaaStorm从Kafka中读取数据,并把数据导到Redshift集群中的服务。

数据仓库已死!数据仓库永生!

我们旧的ETL系统在线服务了很多年,不断地把数据从实时生产数据库搬到数据仓库中。下图显示了这套系统的概要架构。

我们在MySQL数据库有很多触发器,用于监控各种表上面的数据改动。每当有一条记录发生改变时,我们都会写一条改动日志。一个监控改动日志的工作进程就会生成ETL任务,把它们发布到我们的工作队列中。然后,有许多个工作进程就会根据各种不同的ETL逻辑对各行数据做定制化的转换操作。转换操作的输出结果会被写入MySQL,在那里暂时保存,然后再导出到S3,最后COPY到Redshift。

这个系统帮我们把数据从MySQL搬到Redshift一直工作得很好,不考虑复杂度,在我们日积月累地加上了各种告警和工具集之后它还是挺健壮的。可是,它有一些非常严重的缺点,让它难以扩展。

有个非常大的原因是需要占用开发者非常多的宝贵时间去写定制的ETL、相关的单元测试、做代码审查、整合到我们的单体代码库等。还有,每当我们关注的源头的表的任何一个字段发生了改变时,我们都要在MySQL端和Redshift端做模式转换,又要在开发环境做,又要在测试环境做。

最重要的原因是这套系统是只支持一个Redshift集群的,也就是说每个团队都得为他们自己关心的数据构建一套系统来把数据导到Redshift集群中去。我们已经到了一个临界点,必需要用一套新的系统,以便有更好的扩展能力来适应我们公司规模不断扩张的需要了。

Redshift连接器: 新的希望

Redshift连接器概要视图

要解决我们的旧Redshift导入系统的问题,新系统必须有以下功能:

不需要写定制的ETL就可以写入新表

自动适应模式转换

快速写入

可以从失败中优雅地恢复

幂等地写操作

支持多个Redshift集群

根据我们使用旧的ETL系统的经验,Redshift非常适合做从S3的COPY操作:需求是每天写入几百万行数据,以行为单位的插入操作并不合适。因此,我们需要新的系统来完成两个主要任务:一个要把数据写进S3,另一个把数据从S3读出来,再写入Redshift。

幸运的是,PaaStorm的Spolt已经提供了必要的抽象,来完成这两个任务。用Spolt可以从指定的Kafka Topic中读出消息,按某些方式处理过之后,再发往下游进行后续处理。一个S3的Spolt可以从上游的Kafka Topic中读出消息,把它们组成小的批量,再写入S3。在每次写入S3之后,S3 Spolt可以再向Kafka中发送一条消息,记下向S3中写入了什么数据。然后,这个下游的Topic就可以被当成一个状态记录,记下哪些消息已经被成功的写入S3了。

然后,再用一个Redshift的Spolt去把S3 Spolt发布到Kafka中的消息读出来,也就知道了该向Redshift中写入哪些S3的数据。这样,把数据写入S3的系统和把数据写入Redshift的系统就可以相互独立的运行,只是使用Kafka来做通信的通用协议而已。

S3 Spolt

S3 Spolt用于从数据管道上游的Kafka Topic中读出数据,并把数据写入S3的文件之中。下图简要地展示了S3 Spolt的功能。

上游的Kafka Topic中可能是从MySQL表中出来的原始行数据,可能是一个开发者批量写入数据管道的原始消息,也可能是某个中间环节的Spolt对这些原始数据做的一系列转换的一个中间结果输出。

批量写入S3

Redshift非常适合于从S3中批量地把多个文件并行导入Redshift的场景。为了利用这个特性,S3 Spolt会把消息分批,把它们以一个CSV文件的方式写入S3的一个Key。Key由Topic、Partition和这一批消息的第一条在Topic中的偏移量等信息决定。或者当消息条数达到限制时,或者是时间达到设定值时,或者是上游消息中带了一个新的schema_id时等,S3 Spolt就会把一批新消息发布到S3之中。一批的消息条数上限和时间上限都是可配置的参数。

S3 Spolt会把它写入S3的数据做一些转换。它会为每条消息加一个“消息类型”字段,用于描述这条消息代表了哪一类数据操作(Create、Update、Delete或者Refresh)。它也会为每条消息再附加一个字段,值是上游的Topic中对应的Kafka偏移量。然后,每一批消息都会用一个结构经过了仔细设计的csv.Dialect对象打包到一个CSV文件中,这样在避免了解析不同格式的同时,也让对输入的CSV文件的解析变得更容易。然后数据会被按照对应的Key写入S3。

向Redshift Spolt发信号

当S3 Spolt向S3的一个Key下写了一批消息之后,它就需要一个方法来向Redshift Spolt发信号,通知它这个事件。S3 Spolt会向Kafka中写一条消息,包含如下有关S3中的数据的信息:

上游Topic的名字

上游Topic中的消息对应的起始和终止消息的偏移量

用于写数据的CSV的格式

S3上数据的路径

与写入S3的数据对应的模式ID

消息最终会发布到一个Topic中,Redshift Spolt会从中读数据。

检查点与恢复

因为不可避免地会有一些偶尔的停服或网络故障,S3 Spolt应该在故障恢复后可以立刻恢复工作。我们并不希望S3 Spolt每次重启时都从那个Topic的最早的偏移量开始恢复工作。我们也不能容忍丢失数据,所以我们需要知道我们上次工作做到哪里了。S3 Spolt处理这样故障的方式很优雅。重启时,它会起一个轻量级的消费者线程,读读这个Spolt的下游Topic的最后一条消息。

最后一条消息中包含了上游Topic的消息以及它最后的偏移量,S3 Spolt可以用这些信息来在必要时找回它在一个Kafka Topic中的位置。而且,写入S3的文件是幂等的,所以即使写Kafka失败,S3 Spolt在重启时还会替换这个文件。这就为避免重做工作或者丢消息提供了一个保护机制。

Redshift Spolt

Redshift Spolt从S3 Spolt写消息的Topic中读数据,找到S3上数据存储的位置,再把这些数据写入Redshift集群。每个Redshift集群都有特定的Redshift Spolt。下图显示了Redshift Spolt的概要设计。

批量创建S3清单文件

与S3 Spolt写入S3时用的批量策略类似,我们也会利用Redshift的一次从S3中复制多份文件的特性。Redshift Spolt读的每一条Kafka消息中都包括着一批S3文件的信息,因此,我们也可以把一定数量的这些文件堆积起来,再通过S3清单一次把它们拷过来。

比如,Kafka中的一千条消息代表着S3中的一万条记录,让我们可以用一条COPY命令就把它们对应的一千万条数据记录拷过来。只要我们用模式ID来做数据批量,在同一批的所有文件中的所有记录就都是用完全相同的格式的。我们用它们的模式ID缓存消息,在达到了指定的消息数或者达到了在缓存中累积消息的超时时间之后,就把它们刷出去。

在刷出一个缓存中的数据时,我们先提取每条消息对应的S3路径,再把它们写到S3上的一个唯一的清单文件中。因为S3在写一致性方面没有list功能,这个清单文件在我们读取所需文件并拷到Redshift上时,在保证强一致性方面起着至关重要的作用。然后,Redshift Spolt就把这个清单文件的位置发给Redshift写入者。

做模式转换

Schematizer提供了一系列的功能来把Avro模式转换成Redshift表的CREATE语句。在有一些新文件要拷到Redshift上时,Spolt会检查三种情况:目标表不存在、目标表存在但是模式不同、目标表存在而且模式也相同。

当Redshift Spolt开始写入数据时,它会检查一张状态表,其中包含了每张表与它所用的模式ID的映射,以此判断我们在Redshift中是否已经创建了对应的目标表。如果没有,我们会利用这个模式ID,让Schematizer去生成一个CREATE TABLE语句。

如果目标表已经存在,但我们用来创建它的模式ID与当前的写入进程用的不同,这就表明模式已经改变了,这时我们就该生成一系列的操作,来把数据从旧表迁移到新表去。

最后,如果状态表中的模式ID与当前的写入进程用的模式ID相同,我们就什么都不用做,因为目标表的模式正是我们希望的。

分阶段将数据迁移入目标表

当目标表已经存在并且模式也是我们想要的时,我们就需要有方法来把各条记录写或更新进去。我们先用当前任务使用的模式ID来创建一张临时的阶段表。然后,用S3清单文件来让Redshift批量把文件拷到这张临时表中。这样我们就有了一张包含着几百万条(可能是)记录的表,我们要把它合并到目标表中去。

队表中的每一条记录都代表着一个行事件,也就是上游数据源中的Create、Update、Delete或Refresh事件。我们不能简单地把每条记录都直接写到目标表中,因为我们要处理类似Update和Delete这种特殊事件。这样,我们就会做一个合并的策略,策略与Redshift文档中建议的相似,并有少量的修改。这个就增加了唯一主键的约束,而这个在Redshift中并不是原生支持的。

首先,我们只保留每个主键的最大的偏移量的那一行。这样,如果在上游发生了有一系列关于同一行数据的操作我们就只会保留那行记录的最新的值。

对阶段表中剩下的记录来说,我们把目标表中有相同主键的记录全删掉。从直观上来看,我们就在从目标表中删除那些我们想用新内容替换的记录。请注意这也会处理Delete消息,因为那些记录会被简单地删除掉,而没有新东西去替换它们。

接下来,我们把阶段表中与Delete事件对应的消息全删掉,正如从目标表中删除它们一样。

这样,所有剩下的记录所对应的数据在目标表中都不会有相应的数据了,那我们就可以简单地把它们全插入到目标表中,然后再删掉阶段表。这样所有数据就都从阶段表中合并到目标表了。

检查点与恢复

每一次更新插入任务都可能会涉及数百万条记录的改动,耗时数分钟。与S3 Spolt相似,在Redshift由于网络故障发生重启,或者Redshift Spolt失去了与Redshift的连接时,Redshift Spolt都要表现得很健壮,这样我们才不必重复做过的事情。我们用了一张Redshift中的状态表来建立每个Topic和分区到偏移量的映射关系。当Redshift Spolt完成了一次更新插入任务之后,它就会把Redshift状态表中与Topic和分区对应的偏移量设置成这批消息对应的最大偏移量。

这整个更新插入操作和做检查点的操作都发生在一个事务中,所以每次做的任务都是要么全部成功,要么全部不成功,绝不会让Redshift处于某种不一致的状态。这样每当Spolt重启时,它先检查这张状态表,找到它所处理的Kafka Topic的最后位置。

展望未来

Yelp的数据管道已经极大地改变了我们考虑数据的方法。我们处理数据的方式已经从批量移动数据和定时调度任务,转变成了接入数据块的流,以此来构建更实时的系统。事实证明,数据管道底层架构提供的抽象在构建到诸如Redshift和Salesforce等各种目标数据库的连接器时极为有用。到目前为止,数据管道所展示出来的潜能是令人极其兴奋的,我们很高兴地看到在将来,我们可以如何利用这些工具来让Yelp的数据更容易为大家所用。

鸣谢

非常感谢Redshift连接器的合作者Matt K.,以及Chia-Chi L.和Justin C.,他们自始至终不断地为这个项目提供着反馈和指导。也对整个业务分析组和指标组至以最大的敬意,他们构建了整个数据管道架构,才让我们的项目成为可能。

注:转载文章均来自于公开网络,仅供学习使用,不会用于任何商业用途,如果侵犯到原作者的权益,请您与我们联系删除或者授权事宜,联系邮箱:contact@dataunion.org。转载数盟网站文章请注明原文章作者,否则产生的任何版权纠纷与数盟无关。
期待你一针见血的评论,Come on!

不用想啦,马上 "登录"  发表自已的想法.