2009年11月24日星期二

HadoopDB



分析数据库(Analytical DB)占有39.8亿/146亿(27%)的数据库市场.递增率是10.3%

Share-nothing : 无共享,指数据分布在不同的主机器,数据横向分割,相互之间是独立的.

Analytical DB vendor leader : Teradata.
Share-nothing project :
Oracle, Exadata
Microsoft, Madison

以share-nothing布署的DBMS称为并行数据库(parallel database)

HadoopDB 由三部分组成
1) PostgreSQL (or MySQL) : database layer
2) Hadoop : communication layer
3) Hive : translation layer,给SMS planner用的轻量修改版.

相关的项目:
1) Yahoo, Pig
2) Microsoft, SOCPE
3) Open source, Hive
4) Greenplum, 永亚软件
5) Aster Data

4,5加入了写MapReduce方法的支持.

期望的特性:
Performance : 低消耗,低底线也是高性能的表现
Fault Tolerance : 对于Analytics DB来说,没有要开机重运行的查询. 分析的工作负担是只读查询而且无更新. 机器越多,配置越低,失败机率越高,
运行在主机类型差异的环境的能力 : 主机差异可能性很高,并发查询的速度取决于最差的主机的性能
灵活的查询接口: UDF (user defined functions), SQL, non-SQL 都需要

背景和当前解决方案的缺点:
Gamma ; Grace parallel DBMS : Fault Tolerance, 差异环境的运行能力不行.
MapReduce: Performace不行

HadoopDB:
组件:
1) Data Connector : 连接MySQL和PostgreSQL,并做为Hadoop中InputFormat的实现. 是DBMS与TaskTracker之间的接口
2) Catalog : DB的元信息,如连接参数,数据集的分布,副本的位置,分区等等. 当前是以XML文件形式保存在HDFS上,以后想以独立服务的形式提供.
3) DataLoader : 装载时通过一个指定的分区键重分区数据,将单点的数据分为小的多块分区或块(chunk),及以批量的装载多个单点的数据库以块的形式, 分为两个,Global Hasher和LocalHasher
GlobalHasher从存在HDFS上原文件读出数据,并分区成和Cluster结点一样多的块,(估计)还保存在HDFS上
LocalHasher从HDFS上读出一个分区,并切分成更小的块(有最大块大小的设置,推荐1G)保存在本地的数据库中.
两个Hasher用的相同的hash方法,它的不同之处在于确保每个块有相同的大小.
4) SQL到MapReduce,再到SQL (简称SMS planner)
Hive将一个SQL查询(HiveQL)变为多个operator,以有向无环图的形式组织,分别交由MapReduce的两个阶段执行.Hive中有简单的优化器来优化operator.
SMS planner采用相同的方式, 但在Map阶段,直接对本地数据库(本地的Chunk)进行SQL的查询,这是因为每个Slave(TaskTracker)都有安装的数据库,对Reducer来说还要分成多个operator,(这应该是采用了Hive的技术).
对于SMS planner来说,针对不同类型的分区有两种情况,一是分区时已保证相同的主键被分到了一个TaskTracker, 这时候只要一个Map的操作就能完成,二是分区时不能保证结果集的主键分到同一个TraskTracker, 这时候要Map和Reduce两个阶段一起工作.

HadoopDB安装布署:
1)Java 1.6
2)Hadoop 0.19.1
3)安装DBMS在每个slave结点上.










2009年11月5日星期四

HBase 研究 -- HLog

HFile
==================================
HFile 有两个内部类
1) Reader
2) Writer
都是公开的静态的.

HFile结构:

Reader装入HFile通过 LoadFileInfo方法实现
1) 从文件尾读到Trailer
2) 在Trailer结构中读到FileInfo
3) 通过Trailer结构中读出data index
4) 通过Trailer结构中读出meta index

index 结构是由HFile.BlockIndex类实现
BlockIndex有三个数组,分别是
1) blockKeys byte[][]
2) blockOffsets long[]
3) blockDataSize int[]

1)是一个组合的Key
2,3) 指向了Value域. Value有可能是压缩的,因此,应从当前的blockOffset[]中读出到下个下一个blockOffset[]为止,读出的字节解压缩后大小应是blockDataSize[]中读出的值

HFile.Reader读出的data block可以cache在BlockCache中. BlockCache是一个接口,实现有LruBlockCache,SimpleBlockCache等,

对于BlockCache来说,存入一个block的方法是调用cacheBlock,参数是一个blockName字串,和一个用ByteBuffer类型表示的Buffer.

******************************************

HRegion
==================================
checkReadOnly()方法会抛异常.

2009/11/09
HLog 文件位于hdfs://主机:<9000>/hbase/.log/,<60020>,/


2009/11/11
TransactionalRegionServer
==================================
是HRegionServer的替代,实现了TransactionRegionInterface
与Transaction相关的方法是beginTransaction, delete, get, put, openScanner, commit, commitIfPossible, commitRequest,

与Transaction相关的有TransactionId (long),TransactionState.
TransactionId来自于客户端,HBaseBackedTransactionLogger.createNewTransactionLog()的返回值,从内部实现看是一个随机数, TransactionId是Client与Server之间最基本的联系

TransactionState在Server与Client端各有一个,Client是org.apache.hadoop.hbase.client.transactional.TransactionState. Client端的TransactionState是一个封装了TransactionId及相关的Region,也就是说一个Transaction关联多个Region.

Server端的TransactionState,是org.apache.hadoop.hbase.regionserver.transactional.TransactionState. 真正保存了Transaction的状态(PENDING(初始), COMMIT_PENDING(确定可以提交了), COMMITED(提交了) or ABORTED(放弃))),保存的所有读(Get的row key),所有的Scan(ScanRange), 所有的删除(Delete),以及所有的写(Put), 以及所有可能冲突的TransctionState的集合(set).

在Server端,Transaction是被委托到Region来进行的.Client与Server之间联系就是TransactionId,及一个确定的Region Name.即是通过TransactionalRegion类来实现.
TransactionRegion通过OCC机制来完成Transaction,但由于一个Transaction跨多个Region,所以,所有的Region必须都确保可以提交才能提交. 这是由Client端的org.apache.hadoop.hbase.client.transactional.TransactionManager来完成.如果在client端中间提交的过程中(也就是所有Region都确定可以提交一个Transaction后),有失败发生.则我们要重新决定Transaction,这个目前没有实现.

Transaction采用两阶段提交,1)prepareCommit, 2)doCommit.两阶段提交中,Get是即时的,也就是说,在prepareCommit之前就已经有结果返回了. 在Transaction中,对于一条特定的row, Get会读原表得到一个结果,并且当前Transaction有对同一row操作的Put(这个put 肯定1)在这个Get之前, 2)还未真正执行)的话,会读出这个Put并生一个合并的结果.
Put, Delete是在doCommit中执行的,并且没有考虑顺序问题(Bug).

TransactionalRegionServer在实例化时会创建一个Leases线程,用于做Transaction的监控.Leases线程会定时检查一个Lease是否过期,如果过期,则会回调这个Lease的Listener来发布这个消息. Leases采用了DelayQueue来操作Lease.
Transaction的Lease是在这个Region的Transaction创建时创建(TransactionRegion.beginTransaction), 对应的LeaseListener是TransactionLeaseListener, 如果Lease对应的Transaction是PENDING(初始)的,则直接设置它状态为ABORT,如果是COMMIT_PENDING, 则进行CommitPendingWaits计数,并更新Lease.如果CommitPendingWaits的计数到10(hard code的),则将这个Transaction通过Log做提交. 但由于全局Log没有实现,当前的做法是直接提交.

有关Scan --Scanner
==================================
HRegionServer实现的HRegionInterface中含有如下方法与Scanner相关
1) openScanner
2) 两个next函数来取回Scan结果
3) close来结束一个Scan
Scan的Handle是由openScanner返回的scannerId.
TransactionalRegionServer新加入了一个openScanner的方法,这个方法能接额外的transactionId参数.
Scan采用Scanner类来完成工作.即接口InternalScanner的实现类
InternalScanner的有两个方法:
1) next 用来取得下一个row的结果集一个KeyValue的list. (应是针对于多个Column)
2) close
InternalScanner用于返回一条row记录,包括了在scan中指定的多个column.

HRegionServer中用一个支持并发的HashMap来保存所有的InternalScanner,键即是scannerId(一个随机数)的转换成的字符串.
HRegionServer中采用的InternalScanner的实现类是RegionScanner.
RegionScanner内部委托KeyValueHeap来取得结果. RegionScanner对于传入的Scan,生成相应的多个KeyValueScanner.并与addictional的KeyValueScanner合并生成KeyValueHeap. 前者Scanner是从Store生成的,而一个Store取一个Column Family有关.另从Store取出的KeyValueScanner其实会从两个地方读取KeyValue的值 : 1)MemStore (Cache); 2) 多个StoreFile (真正的存贮文件,每个StoreFile对应于一个Column(Column Family : Qualifier)).

KeyValueScanner用于取得一个KeyValue.即一个Column的value,Column做为一个Key已经包括Row key, column family, qualifier, 及timestamp.

KeyValueHeap实现了KeyValueScanner及InternalScanner接口.内部有三个成员.一个当前的KeyValueScanner, 一个PriorityQueue存放其它的KeyValueScanner.以及一个比较器用来对PriorityQueue中的KeyValueScanner进行排序. 因此KeyValueHeap的工作原理是在内部的多KeyValueScanner中进行查找,以找出最前的一个Row相关的所有KeyValue对.来返回.

TransactionRegionServer中会另加入一个来自TransactionState中的Scanner,这个Scanner是PutScanner, PutScanner实现KeyValueScanner及InternalScanner接口.实例化时,PutScanner从当前的TransactionState中取得所有的Put并且放入KeyValueScanner的一个集合中.