17611538698
webmaster@21cto.com

Uber是如何使用MySQL设计可扩展性数据存储的?

资讯 0 3121 2017-07-26 11:55:48

uber.png

 
21CTO社区导读:本文原文为DESIGNING SCHEMALESS, UBER ENGINEERING’S SCALABLE DATASTORE USING MYSQL。 原文分为三篇文章进行对主题内容的解读,在此整理结合到一起,方便大家阅读。
Uber是如何使用MySQL设计可扩展性数据存储的(一)
Schemaless的形成:根据Uber工程师的习惯使用MySQL设计的数据存储,使我们可以 从2014 扩容到更高。这是Schemaless三部曲中的第一部。
在 Mezzanine项目 中我们描述了我们是如何将Uber的核心行程数据从单个的Postgres节点迁移到Schemaless,这是我们开发的一个容错性很高、可用的数据存储。
本文将进一步讲述它的架构、它在Uber基础结构中的角色以及他是如何成为该角色的。
我们对新数据库的迫切需求
2014年初,由于出行业务的迅猛增长,数据库空间即将耗尽。每次入住新的城市以及行程的里程碑都会把我们推向危险的境地,直到我们发现到年末时Uber的基础架构将无法继续发挥效用:Postgre并不能存储如此多的行程数据。我们的任务是实现Uber的下一代数据库技术,一个耗时数月甚至几乎整年的任务,大量的来自于我们世界各地研究所的工程师参与进来。
但是首先,在商业与开源选择如此之多的当下,为什么要自己构建一个可扩展的数据库。我们对我们新的行程数据存储有5点关键的需求:
  • 我们新的解决方案需要可以通过增加服务器线性扩容,这是原有的Postgre所缺乏的。添加服务器应该能在增加硬盘存储的同时减少系统的响应时间。
  • 我们需要写入的能力。我们之前通过Redis实现了一套简单的缓冲机制,因此如果Postgre写入失败,我们可以稍后重试,因为行程已经在中间层存入了Redis当中。但是当行程数据在Redis当中时,是不能从Postgre中读取的,然后一些功能就挂了,比如计费。很烦,不过至少我们没有丢失行程数据。随着时间流逝,Uber逐渐成长,我们基于Redis的解决方案不能扩容。Schemaless需要支持一种类似Redis的机制,但最好还是写完即时可读。
  • 我们需要一种机制通知下游依赖。在现有系统当中,我们同时处理多个行程组件(比如计费,分析等)。这种处理方式很容易出错:如果任何一步失败了,我们就比如从头重试,即使一些组件处理已经成功了。这就不能扩容了,因此我们想把这些步骤打碎成独立的步骤,由数据变更发起。我们曾经确实有一个异步事件系统,但是它是基于 Kafka 0.7的。我们没法让它无损运行,因此我们希望新系统有一些类似的机制,但是可以无损运行。
  • 我们需要副索引。由于我们是从Postgre迁移的,那新的存储系统需要支持Postgre的索引,会按照习惯用副索引搜索行程数据。
  • 我们需要运维够信赖的可靠系统,因为其中包含了行程数据的关键任务。如果凌晨3点我们接到叫车请求,但是这时数据存储无法响应查询,导致业务宕机,我们是否有相关操作知识可以快速解决这个问题。

鉴于以上种种,我们分析了几种常用的选择的优势和潜在的限制,比如Cassandra、Riak、MongoDB等。出于说明的目的,我们提供了如下图表,展示了不同系统选择下的不同功能组合:
/uploads/fox/26133632_0.jpg
所有的三个系统都可以通过在线增加节点线性扩容,只有一对系统可以在宕机时收到写操作。所有的解决方案中都没有内置的方式将变化通知下游依赖,因此可能需要在应用层实现该功能。它们都索引功能,但是如果你想索引多个不同的值,查询会变慢,因为他们使用分散查询并聚合结果的方式查询了所有节点。最后对于其中的一些系统有过单集群的使用经验,但不提供面向用户的在线流量,而且在我们的服务连接的时候有各种各样的运维问题。
最终我们的确定运维信赖为主要因素,因为它包含了行程数据的关键任务。 可供选择的解决方案理论上可能都是可靠的,但是我们是否有运维的知识来立即发挥其最大功效,基于此我们决定基于Uber的使用情况开发自己的解决方案。这不仅基于我们使用的技术,而且根据成员的经验。
需要注意的是我们对这些系统的研究持续了两年,没有发现适合行程数据存储的,但是我们已经在其它领域成功接受了Cassandra与Riak作为我们的基础服务,而且在生产环境使用这些为数百万级的用户提供服务。
在Schemaless中我们相信
由于以上的所有选择在规定的时间内都不能完全满足自己的需求,我们决定构建我们自己的系统使运维尽量简单,也参考了其它厂扩容的经验。这个设计的灵感来自于Friendfeed,运维的方面则参考了Pinterest。
最后我们决定构建一个键值存储,允许存储任何JSON数据而不需要严格的格式验证,一个非结构化的模式(命名的由来)。这是一个只扩展分片的MySQL, master节点都带有写缓冲在应对MySQL宕机,数据变更通知是一个订阅-发布的功能,我们称之为trigger。最后,Schemaless支持全局数据索引。下面我们将讨论一下数据模型概览以及一些关键特性,包括剖析Uber的一份行程数据,更深入的例子保留在接下来的文章中。
Schemaless的数据模型
Schemaless是一个只扩展的稀疏三维持久化哈希表,非常类似Google的 Bigtable。Schemaless中的最小数据被称作cell,不可更改;一次写入后不可被覆写或删除。Cell是一个JSON blob通过一个rowkey和一个columnname引用,还有一个referencekey叫做ref key。rowkey是一个UUID,column name是一个字符串,reference key是一个整型。
你可以将row key看作是关系型数据库的主键,column name看作是关系型数据库的列。无论如何,在Schemaless中没有预定义或强制模式而且每行不需要共享column name;事实上,columnname完全由应用层定义。ref key用于给一个指定row key和列加版本。因此如果一个cell需要更新,你只需写一个新的cell附带一个更大的ref key (最新的cell是那个有最大的ref key的)。ref key也可以用作标记一个列表中的实体,但主要用作标记版本。具体哪种形式由应用本身决定。
应用通常把相关的数据组织进同一列,然后每列的所有cell在应用侧的结构都大致相同。这种分组方式很好的把一起修改的数据很好的组织到了一起,这样应用程序就可以在数据库不停机的情况下迅速修改结构。下面的例子进行了更详细的叙述。
实例:Schemaless中的行程数据存储
在深入了解我们如何在Schemaless中对行程数据建模之前,让我们先剖析一下一个Uber的行程。行程数据在不同的时间点产生,从上车下车到付费,这许多信息伴随着用户在行程中的反馈以及后台进程处理异步到达。下图简要说明了一个Uber行程的不同阶段是何时发生的:
/uploads/fox/26133632_1.jpg
这个图表展示了一个我们行程流的简化版。*标志的部分是可选的且可能发生多次。
一个行程是由乘客发起,由司机结束,包含开始与结束的时间戳。这些信息构成了行程的基础,我们据此计算出该次行程的费用,由司机来收费。
行程结束后,我们可能要调整跟收取或发放的费用。我们也可能给行程数据添加备注,从乘客或司机出发出反馈(上图中星号部分标出)。在第一张信用卡超期或禁用的情况下,我们不得不尝试用多张信用卡付款。
Uber行程流是一个数据驱动的过程。随着数据变得有效或添加,特定的一组处理会在该行程上执行。这些信息中的一部分,比如乘客或司机的评级(上图中note部分),可能在行程结束后几天处理。
好了,那我们如何把上述的行程模型映射到Schemaless呢?
行程数据模型
使用 斜体字 标注UUID,大写字母表示column name,下表展示了我们行程数据存储的简化版的数据模型。
我们有两个行程(UUIDstrip_uuid1 和 trip_uuid2) 以及四列(BASE, STATUS, NOTES, and FARE ADJUSTMENT)。一个格子表示一个cell,带有一个数字以及一个JSON的 (以{…}缩写)。格子的覆盖代表不同版本 (也就是不同的ref keys)。
/uploads/fox/26133632_2.jpg
trip_uuid1 有三个cell:一个在BASE列,两个在STATUS列,FARE ADJUSTMENT列没有内容。trip_uuid2的BASE列有两个格子,NOTES列有一个,同样的FARE ADJUSTMENTS列也没有内容。在Schemaless中,列没什么不同;每列的语义都由应用层定义,本例中是 Mezzanine。
在Mezzanine中,BASE列的cell包含了行程的基础信息,比如司机的UUID和行程的时间。STATUS列包含行程现在的支付状态,每次我们尝试对行程支付的时候都会插入一个cell (由于信用卡额度不足或者逾期等问题尝试可能失败)。如果司机或者Uber的DOps(司机调度员)有行程相关的备注,会在NOTES列添加一个cell。最后的FARE ADJUSTMENT列的cell记录了行程价格的调整。
我们如此划分列是为了避免数据冲突 而且最小化更新时需要写的数据量。BASE列在行程结束时写入,基本只会写一次。当行程开始尝试支付的时候开始尝试写STATUS列,此时BASE已经写好了,如果支付失败可能会写多次。相似的NOTES列在BASE列写过后的一些节点可能会写多次,但是与STATUS列的写操作完全独立。类似的FARE ADJUSTMENTS列只在行程费用变更时会写,例如路况不好等原因。
Schemaless触发器
Schemaless的一个重要特性是触发器,提供了在Schemaless实例变更时可获得通知的能力。由于cell是不可变的,以及新的版本是追加的,所以每个cell都代表了一个修改或者一个版本,这允许一个实例的值变更可以像日志一样查看变化。对于一个给定实例,可以监听这些变化以及触发基于这些变化的函数,非常像类似Kafka这种事件总线系统。Schemaless的触发器使Schemaless成为一个完美的真实来源的数据存储,因为除了随机访问数据,下游的依赖可以运用触发器功能来监听并触发任何应用侧特定的代码(与LinkedIn’s的 DataBus类似),进而实现数据创建与数据处理的解耦。
在其它用例中,Uber在BASE列写入Mezzanine实例后,使用Schemaless的触发器来进行结账操作。针对上面的例子,当trip_uuid1的BASE列被写入后,我们的支付服务被BASE列的触发器触发,获取这个cell,然后尝试用信用卡支付该行程。无论成功与否,信用卡支付的结果都会回写如Mezzanine的STATUS列。通过这种方式实现了支付服务于行程创建的解耦,Schemaless扮演了一个异步事件总线的角色。
/uploads/fox/26133632_3.jpg
索引的易用性
最后,Schemaless支持在JSON blob中的字段上定义索引。当这些预定义的用于找到cell的字段与查询的参数相匹配时,就会用到索引。索引查询效率很高,因为索引查询只需要访问一个单一的分片来找到需要返回的cell的集合。事实上,查询还可以更深度的优化,因为Schemaless允许非标准化的cell数据直接加入索引中。
索引中含有非标准化数据意味着索引查询在查询和取信息操作一起只需要查询一个分片。事实上,我们通常推荐Schemaless用户在可能需要的地方都把非标准数据加到索引当中,除非只需要直接用row key查询并取回cell。在某种意义上,这样一来我们用存储交换来了快速查询的性能。
作为Mezzanine的一个例子,我们定义个了一个副索引来查询指定司机的所有行程。我们将行程的创建时间以及行程发生的城市非标准话加入到索引中。这样就可以查询一个司机在一个城市中一段时间中的所有行程。
下面我们给出了driver_partner_index YAML 格式的定义,这是行程数据存储的一部分,定义在BASE列上 (这个例子用标准#符号添加了注释)。
table: driver_partner_index # Name of the index.datastore: trips # Name of the associated datastorecolumn_defs: – column_key: BASE # From which column to fetch from. fields: # The fields in the cell to denormalize – { field: driver_partner_uuid, type: UUID} – { field: city_uuid, type: UUID} – { field: trip_created_at, type: datetime}
使用这个索引,通过筛选city_uuid或者trip_created_at,我们能够找出指定driver_partner_uuid的行程。在这个例子中我们只用到BASE列的中的字段,但是Schemaless支持从多个列中非标准化数据,相当于上面column_def列表中的多个实例。
像上文提到的Schemaless高效的索引得益于基于分片字段将索引分片。因此一个索引的唯一需求是索引中有一个字段是分片字段(例如上例中最先给出的driver_partner_uuid)。该分片字段决定了索引实体应该在哪个分片写入或者读取。原因是我们在查询索引的时候需要提供分片字段。这意味着在查询时,我们只需要查询一个分片来获取索引实体。关于分片字段有一点需要注意的是要选一个分布好的字段。UUID最佳,其次是city ids,不要选状态字段(枚举值)。
除分片字段外,Schemaless还支持相等、不等以及范围查询的过滤器,同时支持只查询索引字段的一个子集以及根据索引实体指向的row key获取特定列或所有列。现在分片字段必须是不可修改的,因此Schemaless只需跟一个分片交互,但是我们正在探寻如何在没有太大性能开销的情况下让他成为可变的。
索引具备最终一致性,无论何时我们写入一个cell,我也更新这个索引实体,但是这不发生在同一个事务中。Cell与索引实体通常不在同一个分片上,因此如果我们想要提供一致的索引,就需要在写入操作中引入 2PC ,这会明显加大开销。通过最终一致性的索引,我们避免了这项开销,但是Schemaless的用户可能会看到过期的数据。多数情况下cell变化与相关索引变化之间的延迟能控制在20ms之内。
总结
我们给出了一个数据模型、触发器以及索引的概览,这些都是Schemaless的关键功能,我们行程存储引擎的主要组成部分。在后续的文章中,我们将看到一些Schemaless的其它特性来阐明在Uber的基础设施中,它是如何成为服务的好伙伴的:更多的架构,使用MySQL作为一个存储节点,以及我们如何使触发器在客户端成为可容错的。
Uber是如何使用MySQL设计可扩展性数据存储的(二)
Uber底层如何与Schemaless协同工作,这个数据存储基于MySQL从2014年10月帮助Uber的工程师扩容。这篇是Schemaless三部曲中的第二部:第一部是Schemaless的设计。
在 Mezzanine项目: Uber伟大的迁移,我们讲述了我们是如何将Uber的核心行程数据从一个单一的Postgres实例迁移到Schemaless的,这是我们开发的可扩容、高可用的数据存储。然后我们给出了一个Schemaless的概览?—它的开发的决定过程,以及数据模型概览—而且介绍来一些特征比如Schemaless的触发器和索引。这篇文章覆盖了Schemaless的架构。
Schemaless概要
回顾一下,Schemaless是一个可扩展且能容错的数据存储。基础的数据实体称作cell,是不可变的,一次写入之后不可被覆写(在一些特殊的情况下,可以删除旧的记录) 。一个格子由一个row key,columnname,以及ref key定位。一个cell通过写入一个带有更大的ref key和相同的row key和column name的新版本来更新。
Schemaless对内部存储的数据的结构没有强制要求,并以此得名。从Schemaless的观点看,他只存储JSON 对象。Schemaless独特的支持基于cell的字段构建的高效的具备最终一致性的副索引。
架构
Schemaless的节点分为两类:工作节点和存储节点,既可以在同一个物理机/虚拟机上,也可以分开。工作节点接受到客户端的请求,把请求分散到存储节点,并把结果聚合。存储节点存储数据的方式使单个存储节点上的单个或多个cell的查询非常快。我们把两种节点分开来让两部分各自独立的扩容。下图是Schemaless架构概览:
/uploads/fox/26133632_4.jpg
工作节点
Schemaless的客户端通过HTTP协议与工作节点交互。工作节点将请求路由到存储节点,并根据需要将存储节点返回的结果聚合,并处理一些后台工作。为了解决运行慢或宕机节点的问题,客户端的库文件会显示的尝试其它主机,以及重试失败的请求。写操作对Schemaless是幂等的,因此所有的请求都可以安全的重试 (这真是个不错的特性)。这个特性被客户端类库充分的利用了。
储存节点
我们将数据集合划分入一个固定数量的分片(通常设置为4096),然后映射到存储节点上。一个cell根据它的row key映到到一个分片。每个分片都会按配置的数量复制到多个存储节点上。这些存储节点共同组成一个存储集群,每个节点都有一个主节点与两个从节点。从节点(也称为副本)分布在多个数据中心为机房故障提供冗余。
读写请求
当Schemaless处理一个读请求时,比如读取一个cell或者请求一个索引,工作节点可以从集群内的任何存储节点读取数据,具体从主节点还是从节点读取数据是在一个的底层配置的,默认读取主节点,这意味着客户端可以看到它写请求的结果。写请求(插入cell的请求),只能操作这个cell所属集群的主节点。当更新完主节点的数据,存储节点会异步的将这个更新复制到集群的从节点。


错误处理


分布式数据存储一个有趣的方向是如何处理故障,比如请求返回失败(主节点或从节点)。Schemaless设计的目的就是最小化存储节点读写请求失败造成的影响。


读请求


主节点与从节点的设置意味着只要集群里有一个节点可用就可以提供服务。如果主节点可用,Schemaless总是可以通过查询返回最新的数据。如果主节点宕机了,有些数据可能还没有复制到从节点,因此Schemaless返回得可能不是最新的数据。在生产环境中,复制的延时基本都是亚秒级的,因此从节点的数据基本都是最新的。工作节点对存储节点连接的管理采用 断路器模式,当一个存储节点宕机时,会自动寻找新的节点。通过这种方式,读取任务在故障时转移到了另一个节点。


写请求


从节点宕机并不影响写操作,写请求发往主节点,但是如果主节点宕机,Schemaless同样接受写请求,但是他们会在其它(随机选择的)主节点上实例化。这与Dynamo 或 Cassandra的hinted handoff机制很像。写往其它的主节点意味着随后的读请求在主节点恢复或者从节点升级为主节点前读不到这些写入结果。事实上,Schemaless在处理异步故障的时候都是通过写其它主节点解决的,我们称之为技术缓冲写入(将会在下节中详述)。
使用单一节点负责写入会产生一些优点与缺点。一个优势是对于每个分片的写入操作构成一个 全序 ,这对Schemaless的触发器来说非常重要,我们的异步处理处理框架(这在Schemaless系列文章中的第一篇有提及),因为这样它可以从任何一个节点读取该分片的数据,并能保证处理的顺序。集群中所有节点的cell的写顺序都是一致的,因此在一些情景下Schemaless的分片可以看作一份分区cell的修改日志。
单主节点最突出的缺点是:如果集群中主节点宕机了,我们将数据缓冲写入别的地方,但是不可读。这种麻烦情况的优势在于:Schemaless可以告知客户端,master节点宕机了,因此客户端知道刚刚写入的cell不是马上可以读取的。
缓冲写入
由于Schemaless使用MySQL异步复制,如果一个主节点收到一个写请求,并将写请求实例化,但是在复制到其它从节点的时候宕机了(比如硬件故障)。未解决这个问题,我们使用一项称作缓冲写入的技术。缓冲写入通过将数据写入多个集群来最小化数据丢失的概率。如果一个主节点宕机了,数据对接下来的读请求是不可用,但是还是先被实例化下来。
通过缓冲写入,当一个工作节点收到一个写入请求,他将请求写入两个集群:一个副集群和一个主集群(按这个顺序)。仅当两个写入都成功了时才会告知客户端写入成功。请看下表:
/uploads/fox/26133632_5.jpg
主集群的主节点是接下来的读请求期望读取数据的地方。如果主机群主节点在异步MySQL复制将cell复制到主集群从节点之前宕机了了,副集群主节点暂时充当数据备份。
副集群主节点是随机选择的,写操作写入一个特殊的缓冲表。一个后台的进程监控主集群的从节点来查看何时cell出现,仅当那是从缓冲表删除该cell。存在副集群意味着数据至少写入了两台主机。附带一下,副集群主节点的数量是配置的。
缓冲写入利用幂等性,如果一个带有指定row key,column name和ref key的cell已经存在,这个写入会被拒绝。幂等性意味着如果缓冲的cell有不同的row key,columnname和ref key,当主集群主节点恢复时会写入主集群。从另一方面说,如果多个带有相同的row key,columnname以及ref key的写操作被缓冲,他们中的只有一个可以成功,当主集群恢复的时候,其它的会被拒绝。
使用MySQL作为存储后端
Schemaless的强大(与简单)来源于存储节点中使用了MySQL。Schemaless本身只是在MySQL之上加了一层薄的封装用于将请求路由到正确的数据库。通过使用MySQL InnoDB内置的索引和缓存,我们获得了cell及副索引查询的高性能。
每个Schemaless分片是一个独立的MySQL数据库,每个MySQL数据库服务器包括一系列MySQL数据库。每个数据库包含一个盛放cell的MySQL表(称作实体表)以及每个副索引各有一张表,另外还有一组辅助表。每个Schemaless的cell是实体表中的一行,并有如下MySQL表定义:
/uploads/fox/26133632_6.jpg
added_id 列是一个自增的整数列,而且是实体表的MySQL主键。使用added_id作为主键使MySQL在磁盘上线性写入cell。此外added_id为每个cell提供了一个唯一的指针,因此Schemaless的触发器可以有效地使用它按插入顺序提取数据。
而 row_key, column_name, 和ref_key 三列即是每个Schemalesscell的row key、columnname和ref key。为了通过这三列高效地查找cell,我们在这三列上定义了一个MySQL联合索引。因此我们可以高效根据给定row key和column name找到所有cell。
body列使用压缩过的MySQLblob格式存储了cell的JSON对象。我们试验了各种编码和压缩算法,最后出于速度和体积的考虑决定使用MessagePack 和ZLib (详细内容将会在后面的文章中讨论)。最后,created_at列用于存储cell插入的时间戳,因为Schemaless的触发器会查询一个给定时间节点之后的cell。
基于这些配置,我们使用客户端控制结构,而无需修改MySQL的中的表结构;而且可以高效的寻找cell。此外added_id列使插入线性写入到磁盘上,据此我们可以像分区日志一样高效的操作数据。
总结
Schemaless如今是Uber底层一大批服务的生产中的数据存储。我们很多服务高度依赖Schemaless高可用及可扩容的特性。
Uber是如何使用MySQL设计可扩展性数据存储的(三)
本文主要介绍Schemaless triggers的细节及相关案例,从2014年10月开始,通过这种方式提供给Uber在数据存储方面伸缩的能力。这是本系列文章的第三部分,第一部分是关于Schemaless总体的设计,第二部分是关于其架构的讨论。
Schemaless triggers是一种具有可伸缩性、容错性和无损性的技术,它可以监听Schemaless实例的变化。它是隐藏在行程背后的流程管理引擎,从司机的领航员按下“结束行程”并支付了行程费用开始,所有相关的数据都进去我们的数据仓库等待我们分析。在Schemaless系列的最后一篇中,我们将深入探讨Schemaless triggers的相关功能,以及我们如何让这个系统具备可伸缩性和容错性。
总的来说,在Schemaless中最基本的单元数据被称为单元(cell),它是不可变的,一旦写入就不能再重写(当然在特殊情况下,我们可能会删除旧记录)。一个单元可能被行键(row key)、列名称(column name)及引用键(ref key)所引用,单元的内容通过写入一个更高版本的引用键来进行更新,其中行键和列名称保持不变。
Schemaless不对存储在其内部的数据执行任何的类数据库的schema操作,所以这也正是为什么叫Schemaless的原因,从Schemaless的观点来看,它仅仅存储JSON对象。
Schemaless Triggers案例
让我们看一下实际中Schemaless trigger是如何工作的,下面的代码展示了我们是如何异步计费的一个简化版本,大写表示Schemaless 列名称。案例中使用的是python语言:
我们通过添加一个注解符@trigger定义一个trigger,这个注解可以加在一个函数上,也可以添加在Schemaless 指定的列上。这样的做法让Schemaless triggers去调用指定的函数,在本例中是指bill_rider,当一个单元被写入一个指定的列时将被触发调用。这里的BASE是一个列,当一个新的单元写入到BASE时表明行程已经结束。然后就会触发trigger,这时行键(这里是行程的UUID)就会被传递到函数中,如果需要更多的数据,程序员必须从Schemaless 实例中获取其他的实时数据,本例是从行程存储系统Mezzanine中获取数据。
下面的图片展示了bill_rider的trigger相关的信息流(乘客结账部分),箭头指向表明了调用方和被调用方,旁边的数字表示流程的顺序:
/uploads/fox/26133632_7.jpg
首先行程进状态写入到Mezzanine,这会使 Schemaless Trigger框架调用bill_rider,在调用时,函数要求行程存储获取STATUS列的最新版本信息,在本例中is_completed字段不存在,也就是意味着乘客还未结账,接着再BASE列的行程信息将被获取并通过函数调用信用卡provider进行结账。在本例中,我们成功地使用信用卡进行付款,所以我们将成功状态写回到Mezzanine中,然后将STATUS列中的is_completed字段设置为true。
Trigger框架能保证bill_rider被每个Schemaless 实例的每个单元至少调用一次。一般而言trigger函数只会被触发一次,但在某些出错的情况下可能会被调用若干次,这个错误可能是trigger函数本身的错误也可能是trigger函数外部的错误。这也就意味着trigger函数需要被设计成具备幂等性,在本例中,幂等性可以通过检查单元是否已经被处理完毕来实现,函数检测到如果已经处理完毕了则可以直接返回。
当你在查看Schemaless 如何支持类似本案例的流程时,请记住这个案例。我们将会解释Schemaless 如何被当做变更日志来使用的,并且讨论Schemaless 相关的一些API,最后还会分享我们是通过什么技术让流程变得可伸缩和具备容错能力。
把Schemaless 当做日志
Schemaless 包含所有单元,这也就意味着它包含了指定的行键、列键对的所有版本。也真是因为它拥有单元所有的历史版本,Schemaless 除了可以作为随机访问的key-value存储外,它还可以作为变更日志。事实上,它是一个分区日志,每个切片都是自己的日志,如下面图所示:
/uploads/fox/26133632_8.jpg
每个单元都通过指定的行键(这里是指UUID)进行切片映射后写入特定的切片,在每个切片中,所有单元都会被赋予一个唯一的标识符,这个标识符叫已添加ID。已添加ID是一个自动递增的字段,它代表单元插入的顺序,越是新的单元就会有一个越新的已添加ID。除了刚刚提到的已添加ID外,每个单元还会有单元写入时间字段。在所有分片副本中已添加ID具备唯一性,这个特性对于提供failover能力是非常重要的。
Schemaless 的API既支持随机访问也支持日志类型访问,随机访问API是相对于单元而言的,它由row_key、column_key和ref_key三者共同标识。


put_cell (row_key, column_key, ref_key, cell):
// 通过给定的row key、column key和ref key插入一个单元
get_cell(row_key, column_key, ref_key):
//通过指定的row key、column key和ref key获取指定单元
get_cell_latest(row_key, column_key):
// 通过指定的row key和column key获取具有最高版本号ref key的单元
Schemaless 还包含这些API终端的批处理版本,这里我们省略它。早前说过的trigger函数bill_rider就是使用这些函数去获取和操作一个单元的。
对于日志类型访问API,我们主要关心单元的切片数字、时间戳和已添加ID,这三者合起来称定位的位置。
get_cells_for_shard(shard_no, location, limit):
// 从“shard_no”切片返回在“location”后的不多于“limit”个单元


与随机访问API类似,日志访问API拥有更多方法让我们一次性从不同的切片中去批量获取多个单元。其中的location可以使时间戳timestamp或已添加ID added_id。调用get_cells_for_shard除了返回指定的单元外还会返回下一个已添加ID。例如,如果你通过指定location为1000去调用get_cells_for_shards请求了10个单元,那么返回回来的下一个location的位置偏移量就是1010。
追踪日志
通过日志类型访问API你可以追踪Schemaless 实例,这个看起来就像在你的操作系统中通过tail -f命令去追踪一个文件,或者像kafka这样最新的变更会被轮询的事件通知队列。客户端然后通过维护保持跟踪位置偏移量进而使用它们去轮询。想要开启一个跟踪你要指定起始入口,例如location为0,或者任意的时间戳,或者某位置偏移量。
Schemaless triggers通过使用日志类型访问API实现了相同机制的跟踪,它保持跟踪位置偏移量,通过轮询该API方式的最直接的好处就是Schemaless triggers使处理过程具有容错性和可扩展性。
通过配置Schemaless实例及配置哪些列去轮询数据,然后就可以通过客户端程序去连接Schemaless triggers框架。所有的函数和回调函数都被绑定到框架的数字流上,在适当的时刻将被Schemaless triggers调用,或者说被触发,而这个适当的时刻就是当一个新的单元被插入到Schemaless实例时。
作为回报,框架会将增加运行在主机上的程序需要的工作进程编号。框架优雅地通过可用进程和不可用进程进行分工,将失败的进程上面的工作传播到健康进程去处理。这种分工模式意味着程序员只需编写好处理者即可,例如trigger函数,只要保证这个函数是幂等性的就可以了,剩下的就交给Schemaless triggers来处理。
架构
在这部分中,我们将讨论Schemaless triggers是如何做到可扩展,如何做到让故障影响最小化。
下面的图从一个高层次的角度展示了其架构,拿了前面结账服务的例子:
/uploads/fox/26133632_9.jpg
结算服务使用了运行在三个不同主机上的Schemaless triggers,为简单起见,我们假设每个主机上有一个工作进程,Schemaless triggers框架将切片从多个工作进程中分开,所以每个工作进程负责一个特定的切片。
注意到,工作进程1从切片1拉取数据,而工作进程2则则负责切片2和切片5,最后工作进程3负责切片3和切片4。一个工作进程只处理指定切片的单元,通过获取新的单元去调用这些切片上注册上来的回调函数。其中一个工作进程被指定为leader,它负责分配切片给各个工作进程。如果有一个工作进程挂了,leader将失败进程的切片重新分配给其他工作进程。
在一个切片中,单元都是按照写入的顺序被进行触发的,这也就意味着如果某个触发单元总是因为程序错误而总是失败,那它就会在相应的切片上阻碍单元处理。为了避免这种延迟,你可以配置Schemaless triggers标记多次失败的单元,并将他们放到单独的队列中。这样做以后Schemaless triggers就会跳过出错的单元接着处理下一个单元。如果被标记的单元超过了某一阀值,trigger就会停止,这通常表明是系统错误,需要人工进行修复。
Schemaless triggers通过保存每个切片最新成功被触发的单元的已添加ID去跟踪整个触发过程,框架将这些位置偏移保存在一个共享存储中,例如zookeeper或者Schemaless 实例本身。这也就意味着如果程序被重启了,trigger将会从公共存储中读取获取位置偏移量后继续运行,公共存储同样用于保存一些meta信息,例如协调leader选举的工作,工作进程的发现及移除。
可扩展性和容错性
Schemaless triggers在刚开始设计时就充分考虑其可扩展性,对于任意客户端程序,我们可以在被追踪的Schemaless 实例中添加最多与切片数量相等的工作进程,通常这个数量为4096。
除此之外,我们可以在线添加或移除工作进程来处理Schemaless 实例中其他trigger客户端变化的负载。仅仅通过跟踪框架里面的进度,我们就可以给发送数据的Schemaless 实例添加尽可能多的客户端。在服务器端没有跟踪客户端并推送状态给他们的逻辑。
Schemaless triggers同样也具备容错性的,任何一个进程发生故障都不会影响到系统的运行。
  1. 如果一个客户端进程发生错误了,leader会将失败进程相关的工作重新分配给其他监控进程,确保所有切片都会分配到处理进程。
  2. 如果Schemaless triggers节点上的leader发生故障了,一个新的节点将会被选举作为leader,在leader选举的过程中,单元仍然会被执行,但是工作不能被重新分配,而且不能添加或移除工作进程。
  3. 如果公共存储(例如zookeeper)发生故障了,单元仍然会被处理,但是这种情况也像leader选举期间,不能重新分配工作,工作进程也不能添加或移除。
  4. 最后,Schemaless triggers框架与Schemaless 实例里面的故障是互相隔离的,任意数据库节点宕了都没问题,因为Schemaless triggers可以从他们的备份节点上读取。

总结
从运维的角度来看,Schemaless triggers是一个非常好的伙伴。Schemaless 是一个实时数据源理想的存储,因为这里的数据可以通过随机访问API或者通过日志类型访问API去访问。
另外使用Schemaless triggers的日志类型访问API可以将数据从生产者和消费者中解耦出来,让开发者只要关注逻辑处理而不必关心如何保证其扩展性和容错性。最后,我们可以在运行时添加更多的存储服务器去提升我们的性能和内存。
如今,Schemaless triggers框架是整个行程处理流中的核心驱动,包括将数据收进我们的分析数据仓库和跨数据中心的复制。我们对2016及以后未来的前景充满期待。

评论