年超全超详细的最新大数据开发面试题,附答案解析

hadoop中常问的有三块,第一:存储,问到存储,就把HDFS相关的知识点拿出来;第二:计算框架(MapReduce);第三:资源调度框架(yarn)

HDFS写流程

1)client客户端发送上传请求,通过RPC与namenode建立通信,namenode检查该用户是否有上传权限,以及上传的文件是否在hdfs对应的目录下重名,如果这两者有任意一个不满足,则直接报错,如果两者都满足,则返回给客户端一个可以上传的信息

2)client根据文件的大小进行切分,默认128M一块,切分完成之后给namenode发送请求第一个block块上传到哪些服务器上

3)namenode收到请求之后,根据网络拓扑和机架感知以及副本机制进行文件分配,返回可用的DataNode的地址

4)客户端收到地址之后与服务器地址列表中的一个节点如A进行通信,本质上就是RPC调用,建立pipeline,A收到请求后会继续调用B,B在调用C,将整个pipeline建立完成,逐级返回client

5)client开始向A上发送第一个block(先从磁盘读取数据然后放到本地内存缓存),以packet(数据包,64kb)为单位,A收到一个packet就会发送给B,然后B发送给C,A每传完一个packet就会放入一个应答队列等待应答

6)数据被分割成一个个的packet数据包在pipeline上依次传输,在pipeline反向传输中,逐个发送ack(命令正确应答),最终由pipeline 中第一个 DataNode 节点 A 将 pipelineack 发送给 Client

7)当一个 block 传输完成之后, Client 再次请求 NameNode 上传第二个 block ,namenode重新选择三台DataNode给client

HDFS读流程

1)client向namenode发送RPC请求。请求文件block的位置

2)namenode收到请求之后会检查用户权限以及是否有这个文件,如果都符合,则会视情况返回部分或全部的block列表,对于每个block,NameNode 都会返回含有该 block 副本的 DataNode 地址; 这些返回的 DN 地址,会按照集群拓扑结构得出 DataNode 与客户端的距离,然后进行排序,排序两个规则:网络拓扑结构中距离 Client 近的排靠前;心跳机制中超时汇报的 DN 状态为 STALE,这样的排靠后

3)Client 选取排序靠前的 DataNode 来读取 block,如果客户端本身就是DataNode,那么将从本地直接获取数据(短路读取特性)

4)底层上本质是建立 Socket Stream(FSDataInputStream),重复的调用父类 DataInputStream 的 read 方法,直到这个块上的数据读取完毕

5)当读完列表的 block 后,若文件读取还没有结束,客户端会继续向NameNode 获取下一批的 block 列表

6)读取完一个 block 都会进行 checksum 验证,如果读取 DataNode 时出现错误,客户端会通知 NameNode,然后再从下一个拥有该 block 副本的DataNode 继续读

7)read 方法是并行的读取 block 信息,不是一块一块的读取;NameNode 只是返回Client请求包含块的DataNode地址,并不是返回请求块的数据

8) 最终读取来所有的 block 会合并成一个完整的最终文件

客户端读取完DataNode上的块之后会进行checksum 验证,也就是把客户端读取到本地的块与HDFS上的原始块进行校验,如果发现校验结果不一致,客户端会通知 NameNode,然后再从下一个拥有该 block 副本的DataNode 继续读

客户端上传文件时与DataNode建立pipeline管道,管道正向是客户端向DataNode发送的数据包,管道反向是DataNode向客户端发送ack确认,也就是正确接收到数据包之后发送一个已确认接收到的应答,当DataNode突然挂掉了,客户端接收不到这个DataNode发送的ack确认 ,客户端会通知 NameNode,NameNode检查该块的副本与规定的不符,NameNode会通知DataNode去复制副本,并将挂掉的DataNode作下线处理,不再让它参与文件上传与下载。

NameNode数据存储在内存和本地磁盘,本地磁盘数据存储在fsimage镜像文件和edits编辑日志文件

Secondary NameNode 是合并NameNode的edit logs到fsimage文件中;

这个问题就要说NameNode的高可用了,即 NameNode HA 一个NameNode有单点故障的问题,那就配置双NameNode,配置有两个关键点,一是必须要保证这两个NN的元数据信息必须要同步的,二是一个NN挂掉之后另一个要立马补上。

脑裂对于NameNode 这类对数据一致性要求非常高的系统来说是灾难性的,数据会发生错乱且无法恢复。Zookeeper 社区对这种问题的解决方法叫做 fencing,中文翻译为隔离,也就是想办法把旧的 Active NameNode 隔离起来,使它不能正常对外提供服务。

在进行 fencing 的时候,会执行以下的操作:

1) 首先尝试调用这个旧 Active NameNode 的 HAServiceProtocol RPC 接口的 transitionToStandby 方法,看能不能把它转换为 Standby 状态。 2) 如果 transitionToStandby 方法调用失败,那么就执行 Hadoop 配置文件之中预定义的隔离措施,Hadoop 目前主要提供两种隔离措施,通常会选择 sshfence: (1) sshfence:通过 SSH 登录到目标机器上,执行命令 fuser 将对应的进程杀死 (2) shellfence:执行一个用户自定义的 shell 脚本来将对应的进程隔离

Hadoop上大量HDFS元数据信息存储在NameNode内存中,因此过多的小文件必定会压垮NameNode的内存

每个元数据对象约占150byte,所以如果有1千万个小文件,每个文件占用一个block,则NameNode大约需要2G空间。如果存储1亿个文件,则NameNode需要20G空间

显而易见的解决这个问题的方法就是合并小文件,可以选择在客户端上传时执行一定的策略先合并,或者是使用Hadoop的CombineFileInputFormat<K,V\>实现小文件的合并

1)Client:客户端 (1)切分文件。文件上传HDFS的时候,Client将文件切分成一个一个的Block,然后进行存储 (2)与NameNode交互,获取文件的位置信息 (3)与DataNode交互,读取或者写入数据 (4)Client提供一些命令来管理HDFS,比如启动关闭HDFS、访问HDFS目录及内容等 2)NameNode:名称节点,也称主节点,存储数据的元数据信息,不存储具体的数据 (1)管理HDFS的名称空间 (2)管理数据块(Block)映射信息 (3)配置副本策略 (4)处理客户端读写请求 3)DataNode:数据节点,也称从节点。NameNode下达命令,DataNode执行实际的操作 (1)存储实际的数据块 (2)执行数据块的读/写操作 4)Secondary NameNode:并非NameNode的热备。当NameNode挂掉的时候,它并不能马上替换NameNode并提供服务 (1)辅助NameNode,分担其工作量 (2)定期合并Fsimage和Edits,并推送给NameNode (3)在紧急情况下,可辅助恢复NameNode

简单概述: inputFile通过split被切割为多个split文件,通过Record按行读取内容给map(自己写的处理逻辑的方法) ,数据被map处理完之后交给OutputCollect收集器,对其结果key进行分区(默认使用的hashPartitioner),然后写入buffer,每个map task 都有一个内存缓冲区(环形缓冲区),存放着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式溢写到磁盘,当整个map task 结束后再对磁盘中这个maptask产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task的拉取

简单描述: Reduce 大致分为 copy、sort、reduce 三个阶段,重点在前两个阶段。copy 阶段包含一个 eventFetcher 来获取已完成的 map 列表,由 Fetcher 线程去 copy 数据,在此过程中会启动两个 merge 线程,分别为 inMemoryMerger 和 onDiskMerger,分别将内存中的数据 merge 到磁盘和将磁盘中的数据进行 merge。待数据 copy 完成之后,copy 阶段就完成了,开始进行 sort 阶段,sort 阶段主要是执行 finalMerge 操作,纯粹的 sort 阶段,完成之后就是 reduce 阶段,调用用户定义的 reduce 函数进行处理 详细步骤: 1) Copy阶段:简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求maptask获取属于自己的文件(map task 的分区会标识每个map task属于哪个reduce task ,默认reduce task的标识从0开始)。

2) Merge阶段:这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。

3) 合并排序:把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。

4) 对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。

shuffle阶段分为四个步骤:依次为:分区,排序,规约,分组,其中前三个步骤在map阶段完成,最后一个步骤在reduce阶段完成 shuffle 是 Mapreduce 的核心,它分布在 Mapreduce 的 map 阶段和 reduce 阶段。一般把从 Map 产生输出开始到 Reduce 取得数据作为输入之前的过程称作 shuffle。

在shuffle阶段,可以看到数据通过大量的拷贝,从map阶段输出的数据,都要通过网络拷贝,发送到reduce阶段,这一过程中,涉及到大量的网络IO,如果数据能够进行压缩,那么数据的发送量就会少得多。 hadoop当中支持的压缩算法: gzip、bzip2、LZO、LZ4、Snappy,这几种压缩算法综合压缩和解压缩的速率,谷歌的Snappy是最优的,一般都选择Snappy压缩。谷歌出品,必属精品

规约(combiner)是不能够影响任务的运行结果的,局部汇总,适用于求和类,不适用于求平均值,如果reduce的输入参数类型和输出参数的类型是一样的,则规约的类可以使用reduce类,只需要在驱动类中指明规约的类即可

YARN的基本设计思想是将MapReduce V1中的JobTracker拆分为两个独立的服务:ResourceManager和ApplicationMaster。ResourceManager负责整个系统的资源管理和分配,ApplicationMaster负责单个应用程序的的管理。 1) ResourceManager: RM是一个全局的资源管理器,负责整个系统的资源管理和分配,它主要由两个部分组成:调度器(Scheduler)和应用程序管理器(Application Manager)。 调度器根据容量、队列等限制条件,将系统中的资源分配给正在运行的应用程序,在保证容量、公平性和服务等级的前提下,优化集群资源利用率,让所有的资源都被充分利用应用程序管理器负责管理整个系统中的所有的应用程序,包括应用程序的提交、与调度器协商资源以启动ApplicationMaster、监控ApplicationMaster运行状态并在失败时重启它。

2) ApplicationMaster: 用户提交的一个应用程序会对应于一个ApplicationMaster,它的主要功能有: a.与RM调度器协商以获得资源,资源以Container表示。 b.将得到的任务进一步分配给内部的任务。 c.与NM通信以启动/停止任务。 d.监控所有的内部任务状态,并在任务运行失败的时候重新为任务申请资源以重启任务。

3) nodeManager: NodeManager是每个节点上的资源和任务管理器,一方面,它会定期地向RM汇报本节点上的资源使用情况和各个Container的运行状态;另一方面,他接收并处理来自AM的Container启动和停止请求。

4) container: Container是YARN中的资源抽象,封装了各种资源。一个应用程序会分配一个Container,这个应用程序只能使用这个Container中描述的资源。 不同于MapReduceV1中槽位slot的资源封装,Container是一个动态资源的划分单位,更能充分利用资源。

当jobclient向YARN提交一个应用程序后,YARN将分两个阶段运行这个应用程序:一是启动ApplicationMaster;第二个阶段是由ApplicationMaster创建应用程序,为它申请资源,监控运行直到结束。 具体步骤如下: 1) 用户向YARN提交一个应用程序,并指定ApplicationMaster程序、启动ApplicationMaster的命令、用户程序。 2) RM为这个应用程序分配第一个Container,并与之对应的NM通讯,要求它在这个Container中启动应用程序ApplicationMaster。 3) ApplicationMaster向RM注册,然后拆分为内部各个子任务,为各个内部任务申请资源,并监控这些任务的运行,直到结束。 4) AM采用轮询的方式向RM申请和领取资源。 5) RM为AM分配资源,以Container形式返回 6) AM申请到资源后,便与之对应的NM通讯,要求NM启动任务。 7) NodeManager为任务设置好运行环境,将任务启动命令写到一个脚本中,并通过运行这个脚本启动任务 8) 各个任务向AM汇报自己的状态和进度,以便当任务失败时可以重启任务。 9) 应用程序完成后,ApplicationMaster向ResourceManager注销并关闭自己

在Yarn中有三种调度器可以选择:FIFO Scheduler ,Capacity Scheduler,Fair Scheduler

apache版本的hadoop默认使用的是capacity scheduler调度方式。CDH版本的默认使用的是fair scheduler调度方式

FIFO Scheduler(先来先服务):

FIFO Scheduler把应用按提交的顺序排成一个队列,这是一个先进先出队列,在进行资源分配的时候,先给队列中最头上的应用进行分配资源,待最头上的应用需求满足后再给下一个分配,以此类推。

FIFO Scheduler是最简单也是最容易理解的调度器,也不需要任何配置,但它并不适用于共享集群。大的应用可能会占用所有集群资源,这就导致其它应用被阻塞,比如有个大任务在执行,占用了全部的资源,再提交一个小任务,则此小任务会一直被阻塞。

Capacity Scheduler(能力调度器):

对于Capacity调度器,有一个专门的队列用来运行小任务,但是为小任务专门设置一个队列会预先占用一定的集群资源,这就导致大任务的执行时间会落后于使用FIFO调度器时的时间。

Fair Scheduler(公平调度器):

在Fair调度器中,我们不需要预先占用一定的系统资源,Fair调度器会为所有运行的job动态的调整系统资源。

比如:当第一个大job提交时,只有这一个job在运行,此时它获得了所有集群资源;当第二个小任务提交后,Fair调度器会分配一半资源给这个小任务,让这两个任务公平的共享集群资源。

需要注意的是,在Fair调度器中,从第二个任务提交到获得资源会有一定的延迟,因为它需要等待第一个任务释放占用的Container。小任务执行完成之后也会释放自己占用的资源,大任务又获得了全部的系统资源。最终的效果就是Fair调度器即得到了高的资源利用率又能保证小任务及时完成。

未被external修饰的是内部表(managed table),被external修饰的为外部表(external table)

Hive支持索引,但是Hive的索引与关系型数据库中的索引并不相同,比如,Hive不支持主键或者外键。

Hive索引可以建立在表中的某些列上,以提升一些操作的效率,例如减少MapReduce任务中需要读取的数据块的数量。

在可以预见到分区数据非常庞大的情况下,索引常常是优于分区的。

虽然Hive并不像事物数据库那样针对个别的行来执行查询、更新、删除等操作。它更多的用在多任务节点的场景下,快速地全表扫描大规模数据。但是在某些场景下,建立索引还是可以提高Hive表指定列的查询速度。(虽然效果差强人意)

适用于不更新的静态字段。以免总是重建索引数据。每次建立、更新数据后,都要重建索引以构建索引表。

hive在指定列上建立索引,会产生一张索引表(Hive的一张物理表),里面的字段包括,索引列的值、该值对应的HDFS文件路径、该值在文件中的偏移量; v0.8后引入bitmap索引处理器,这个处理器适用于排重后,值较少的列(例如,某字段的取值只可能是几个枚举值) 因为索引是用空间换时间,索引列的取值过多会导致建立bitmap索引表过大。

但是,很少遇到hive用索引的。说明还是有缺陷or不合适的地方的。

ORC和Parquet都是高性能的存储方式,这两种存储格式总会带来存储和性能上的提升

星形模式(Star Schema)是最常用的维度建模方式。星型模式是以事实表为中心,所有的维度表直接连接在事实表上,像星星一样。 星形模式的维度建模由一个事实表和一组维表成,且具有以下特点: a. 维表只和事实表关联,维表之间没有关联; b. 每个维表主键为单列,且该主键放置在事实表中,作为两边连接的外键; c. 以事实表为核心,维表围绕核心呈星形分布;

雪花模式(Snowflake Schema)是对星形模式的扩展。雪花模式的维度表可以拥有其他维度表的,虽然这种模型相比星型更规范一些,但是由于这种模型不太容易理解,维护成本比较高,而且性能方面需要关联多层维表,性能也比星型模型要低。所以一般不是很常用。

星座模式是星型模式延伸而来,星型模式是基于一张事实表的,而星座模式是基于多张事实表的,而且共享维度信息。前面介绍的两种维度建模方法都是多维表对应单事实表,但在很多时候维度空间内的事实表不止一个,而一个维表也可能被多个事实表用到。在业务发展后期,绝大部分维度建模都采用的是星座模式。

order by 会对输入做全局排序,因此只有一个reducer(多个reducer无法保证全局有序)只有一个reducer,会导致当输入规模较大时,需要较长的计算时间。

Hadoop/MapReduce和Spark最适合的都是做离线型的数据分析,但Hadoop特别适合是单次分析的数据量“很大”的情景,而Spark则适用于数据量不是很大的情景。

Hadoop底层使用MapReduce计算架构,只有map和reduce两种操作,表达能力比较欠缺,而且在MR过程中会重复的读写hdfs,造成大量的磁盘io读写操作,所以适合高时延环境下批处理计算的应用;

Spark是基于内存的分布式计算架构,提供更加丰富的数据集操作类型,主要分成转化操作和行动操作,包括map、reduce、filter、flatmap、groupbykey、reducebykey、union和join等,数据分析更加快速,所以适合低时延环境下计算的应用;

spark与hadoop最大的区别在于迭代式计算模型。基于mapreduce框架的Hadoop主要分为map和reduce两个阶段,两个阶段完了就结束了,所以在一个job里面能做的处理很有限;spark计算模型是基于内存的迭代式计算模型,可以分为n个阶段,根据用户编写的RDD算子和程序,在处理完一个阶段后可以继续往下处理很多个阶段,而不只是两个阶段。所以spark相较于mapreduce,计算模型更加灵活,可以提供更强大的功能。

但是spark也有劣势,由于spark基于内存进行计算,虽然开发容易,但是真正面对大数据的时候,在没有进行调优的轻局昂下,可能会出现各种各样的问题,比如OOM内存溢出等情况,导致spark程序可能无法运行起来,而mapreduce虽然运行缓慢,但是至少可以慢慢运行完。

spark非常重要的一个功能特性就是可以将RDD持久化在内存中。

调用cache()和persist()方法即可。cache()和persist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用persist()的无参版本persist(MEMORY_ONLY),将数据持久化到内存中。

如果需要从内存中清除缓存,可以使用unpersist()方法。RDD持久化是可以手动选择不同的策略的。在调用persist()时传入对应的StorageLevel即可。

应用场景:当spark应用程序特别复杂,从初始的RDD开始到最后整个应用程序完成有很多的步骤,而且整个应用运行时间特别长,这种情况下就比较适合使用checkpoint功能。

原因:对于特别复杂的Spark应用,会出现某个反复使用的RDD,即使之前持久化过但由于节点的故障导致数据丢失了,没有容错机制,所以需要重新计算一次数据。

Checkpoint首先会调用SparkContext的setCheckPointDIR()方法,设置一个容错的文件系统的目录,比如说HDFS;然后对RDD调用checkpoint()方法。之后在RDD所处的job运行结束之后,会启动一个单独的job,来将checkpoint过的RDD数据写入之前设置的文件系统,进行高可用、容错的类持久化操作。

检查点机制是我们在spark streaming中用来保障容错性的主要机制,它可以使spark streaming阶段性的把应用数据存储到诸如HDFS等可靠存储系统中,以供恢复时使用。具体来说基于以下两个目的服务:

最主要的区别在于持久化只是将数据保存在BlockManager中,但是RDD的lineage(血缘关系,依赖关系)是不变的。但是checkpoint执行完之后,rdd已经没有之前所谓的依赖rdd了,而只有一个强行为其设置的checkpointRDD,checkpoint之后rdd的lineage就改变了。

持久化的数据丢失的可能性更大,因为节点的故障会导致磁盘、内存的数据丢失。但是checkpoint的数据通常是保存在高可用的文件系统中,比如HDFS中,所以数据丢失可能性比较低

rdd分布式弹性数据集,简单的理解成一种数据结构,是spark框架上的通用货币。所有算子都是基于rdd来执行的,不同的场景会有不同的rdd实现类,但是都可以进行互相转换。rdd执行过程中会形成dag图,然后形成lineage保证容错性等。从物理的角度来看rdd存储的是block和node之间的映射。

RDD是spark提供的核心抽象,全称为弹性分布式数据集。

RDD在逻辑上是一个hdfs文件,在抽象上是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同结点上,从而让RDD中的数据可以被并行操作(分布式数据集)

比如有个RDD有90W数据,3个partition,则每个分区上有30W数据。RDD通常通过Hadoop上的文件,即HDFS或者HIVE表来创建,还可以通过应用程序中的集合来创建;RDD最重要的特性就是容错性,可以自动从节点失败中恢复过来。即如果某个结点上的RDD partition因为节点故障,导致数据丢失,那么RDD可以通过自己的数据来源重新计算该partition。这一切对使用者都是透明的。

RDD的数据默认存放在内存中,但是当内存资源不足时,spark会自动将RDD数据写入磁盘。比如某结点内存只能处理20W数据,那么这20W数据就会放入内存中计算,剩下10W放到磁盘中。RDD的弹性体现在于RDD上自动进行内存和磁盘之间权衡和切换的机制。

Spark streaming是spark core API的一种扩展,可以用于进行大规模、高吞吐量、容错的实时数据流的处理。

它支持从多种数据源读取数据,比如Kafka、Flume、Twitter和TCP Socket,并且能够使用算子比如map、reduce、join和window等来处理数据,处理后的数据可以保存到文件系统、数据库等存储中。

Spark streaming内部的基本工作原理是:接受实时输入数据流,然后将数据拆分成batch,比如每收集一秒的数据封装成一个batch,然后将每个batch交给spark的计算引擎进行处理,最后会生产处一个结果数据流,其中的数据也是一个一个的batch组成的。

DStream是spark streaming提供的一种高级抽象,代表了一个持续不断的数据流。

DStream可以通过输入数据源来创建,比如Kafka、flume等,也可以通过其他DStream的高阶函数来创建,比如map、reduce、join和window等。

Spark streaming一定是有一个输入的DStream接收数据,按照时间划分成一个一个的batch,并转化为一个RDD,RDD的数据是分散在各个子节点的partition中。

用户在client端提交作业后,会由Driver运行main方法并创建spark context上下文。执行add算子,形成dag图输入dagscheduler,按照add之间的依赖关系划分stage输入task scheduler。task scheduler会将stage划分为task set分发到各个节点的executor中执行。

Master实际上可以配置两个,Spark原生的standalone模式是支持Master主备切换的。当Active Master节点挂掉以后,我们可以将Standby Master切换为Active Master。

Spark Master主备切换可以基于两种机制,一种是基于文件系统的,一种是基于ZooKeeper的。

基于文件系统的主备切换机制,需要在Active Master挂掉之后手动切换到Standby Master上;

而基于Zookeeper的主备切换机制,可以实现自动切换Master。

数据倾斜以为着某一个或者某几个partition的数据特别大,导致这几个partition上的计算需要耗费相当长的时间。

在spark中同一个应用程序划分成多个stage,这些stage之间是串行执行的,而一个stage里面的多个task是可以并行执行,task数目由partition数目决定,如果一个partition的数目特别大,那么导致这个task执行时间很长,导致接下来的stage无法执行,从而导致整个job执行变慢。

避免数据倾斜,一般是要选用合适的key,或者自己定义相关的partitioner,通过加盐或者哈希值来拆分这些key,从而将这些数据分散到不同的partition去执行。

如下算子会导致shuffle操作,是导致数据倾斜可能发生的关键点所在:groupByKey;reduceByKey;aggregaByKey;join;cogroup;

这个问题的宗旨是问你spark sql 中dataframe和sql的区别,从执行原理、操作方便程度和自定义程度来分析 这个问题。

有hdfs文件,文件每行的格式为作品ID,用户id,用户性别。请用一个spark任务实现以下功能: 统计每个作品对应的用户(去重后)的性别分布。输出格式如下:作品ID,男性用户数量,女性用户数量

答案:

reduceByKey:reduceByKey会在结果发送至reducer之前会对每个mapper在本地进行merge,有点类似于在MapReduce中的combiner。这样做的好处在于,在map端进行一次reduce之后,数据量会大幅度减小,从而减小传输,保证reduce端能够更快的进行结果计算。

groupByKey:groupByKey会对每一个RDD中的value值进行聚合形成一个序列(Iterator),此操作发生在reduce端,所以势必会将所有的数据通过网络进行传输,造成不必要的浪费。同时如果数据量十分大,可能还会造成OutOfMemoryError。

所以在进行大量数据的reduce操作时候建议使用reduceByKey。不仅可以提高速度,还可以防止使用groupByKey造成的内存溢出问题。

不会的。

因为程序在运行之前,已经申请过资源了,driver和Executors通讯,不需要和master进行通讯的。

kafka消费消息的offset是定义在zookeeper中的, 如果想重复消费kafka的消息,可以在redis中自己记录offset的checkpoint点(n个),当想重复消费消息时,通过读取redis中的checkpoint点进行zookeeper的offset重设,这样就可以达到重复消费消息的目的了

kafka使用的是磁盘存储。

速度快是因为:

分三个点说,一个是生产者端,一个消费者端,一个broker端。

kafka的ack机制:在kafka发送数据的时候,每次发送消息都会有一个确认反馈机制,确保消息正常的能够被收到,其中状态有0,1,-1。

如果是同步模式: ack设置为0,风险很大,一般不建议设置为0。即使设置为1,也会随着leader宕机丢失数据。所以如果要严格保证生产端数据不丢失,可设置为-1。

如果是异步模式: 也会考虑ack的状态,除此之外,异步模式下的有个buffer,通过buffer来进行控制数据的发送,有两个值来进行控制,时间阈值与消息的数量阈值,如果buffer满了数据还没有发送出去,有个选项是配置是否立即清空buffer。可以设置为-1,永久阻塞,也就数据不再生产。异步模式下,即使设置为-1。也可能因为程序员的不科学操作,操作数据丢失,比如kill -9,但这是特别的例外情况。

通过offset commit 来保证数据的不丢失,kafka自己记录了每次消费的offset数值,下次继续消费的时候,会接着上次的offset进行消费。

而offset的信息在kafka0.8版本之前保存在zookeeper中,在0.8版本之后保存到topic中,即使消费者在运行过程中挂掉了,再次启动的时候会找到offset的值,找到之前消费消息的位置,接着消费,由于 offset 的信息写入的时候并不是每条消息消费完成后都写入的,所以这种情况有可能会造成重复消费,但是不会丢失消息。

每个broker中的partition我们一般都会设置有replication(副本)的个数,生产者写入的时候首先根据分发策略(有partition按partition,有key按key,都没有轮询)写入到leader中,follower(副本)再跟leader同步数据,这样有了备份,也可以保证消息数据的不丢失。

采集层 主要可以使用Flume, Kafka等技术。

Flume:Flume 是管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展API.

Kafka:Kafka是一个可持久化的分布式的消息队列。 Kafka 是一个非常通用的系统。你可以有许多生产者和很多的消费者共享多个主题Topics。

相比之下,Flume是一个专用工具被设计为旨在往HDFS,HBase发送数据。它对HDFS有特殊的优化,并且集成了Hadoop的安全特性。

所以,Cloudera 建议如果数据被多个系统消费的话,使用kafka;如果数据被设计给Hadoop使用,使用Flume。

kafka 宕机了,首先我们考虑的问题应该是所提供的服务是否因为宕机的机器而受到影响,如果服务提供没问题,如果实现做好了集群的容灾机制,那么这块就不用担心了。

想要恢复集群的节点,主要的步骤就是通过日志分析来查看节点宕机的原因,从而解决,重新恢复节点。

在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的,从 而实现的是一种主写主读的生产消费模型。 Kafka 并不支持主写从读,因为主写从读有 2 个很明显的缺点:

而kafka的主写主读的优点就很多了:

每个分区只能由同一个消费组内的一个消费者(consumer)来消费,可以由不同的消费组的消费者来消费,同组的消费者则起到并发的效果。

kafka只能保证partition内是有序的,但是partition间的有序是没办法的。爱奇艺的搜索架构,是从业务上把需要有序的打到同⼀个partition。

Client写入 -> 存入MemStore,一直到MemStore满 -> Flush成一个StoreFile,直至增长到一定阈值 -> 触发Compact合并操作 -> 多个StoreFile合并成一个StoreFile,同时进行版本合并和数据删除 -> 当StoreFiles Compact后,逐步形成越来越大的StoreFile -> 单个StoreFile大小超过一定阈值后(默认10G),触发Split操作,把当前Region Split成2个Region,Region会下线,新Split出的2个孩子Region会被HMaster分配到相应的HRegionServer 上,使得原先1个Region的压力得以分流到2个Region上

由此过程可知,HBase只是增加数据,没有更新和删除操作,用户的更新和删除都是逻辑层面的,在物理层面,更新只是追加操作,删除只是标记操作。

用户写操作只需要进入到内存即可立即返回,从而保证I/O高性能。

首先一点需要明白:Hbase是基于HDFS来存储的。

HDFS:

HBase:

Hbase 中的每张表都通过行键(rowkey)按照一定的范围被分割成多个子表(HRegion),默认一个HRegion 超过256M 就要被分割成两个,由HRegionServer管理,管理哪些 HRegion 由 Hmaster 分配。 HRegion 存取一个子表时,会创建一个 HRegion 对象,然后对表的每个列族(Column Family)创建一个 store 实例, 每个 store 都会有 0 个或多个 StoreFile 与之对应,每个 StoreFile 都会对应一个HFile,HFile 就是实际的存储文件,一个 HRegion 还拥有一个 MemStore实例。

热点现象:

某个小的时段内,对HBase的读写请求集中到极少数的Region上,导致这些region所在的RegionServer处理请求量骤增,负载量明显偏大,而其他的RgionServer明显空闲。

热点现象出现的原因:

HBase中的行是按照rowkey的字典顺序排序的,这种设计优化了scan操作,可以将相关的行以及会被一起读取的行存取在临近位置,便于scan。然而糟糕的rowkey设计是热点的源头。

热点发生在大量的client直接访问集群的一个或极少数个节点(访问可能是读,写或者其他操作)。大量访问会使热点region所在的单个机器超出自身承受能力,引起性能下降甚至region不可用,这也会影响同一个RegionServer上的其他region,由于主机无法服务其他region的请求。

热点现象解决办法:

为了避免写热点,设计rowkey使得不同行在同一个region,但是在更多数据情况下,数据应该被写入集群的多个region,而不是一个。常见的方法有以下这些:

长度原则:100字节以内,8的倍数最好,可能的情况下越短越好。因为HFile是按照 keyvalue 存储的,过长的rowkey会影响存储效率;其次,过长的rowkey在memstore中较大,影响缓冲效果,降低检索效率。最后,操作系统大多为64位,8的倍数,充分利用操作系统的最佳性能。

散列原则:高位散列,低位时间字段。避免热点问题。

唯一原则:分利用这个排序的特点,将经常读取的数据存储到一块,将最近可能会被访问 的数据放到一块。

原则:在合理范围内能尽量少的减少列簇就尽量减少列簇,因为列簇是共享region的,每个列簇数据相差太大导致查询效率低下。

最优:将所有相关性很强的 key-value 都放在同一个列簇下,这样既能做到查询效率最高,也能保持尽可能少的访问不同的磁盘文件。以用户信息为例,可以将必须的基本信息存放在一个列族,而一些附加的额外信息可以放在另一列族。

在 hbase 中每当有 memstore 数据 flush 到磁盘之后,就形成一个 storefile,当 storeFile的数量达到一定程度后,就需要将 storefile 文件来进行 compaction 操作。

Compact 的作用:

spark streaming 的 checkpoint 仅仅是针对 driver 的故障恢复做了数据和元数据的 checkpoint。而 flink 的 checkpoint 机制 要复杂了很多,它采用的是轻量级的分布式快照,实现了每个算子的快照,及流动中的数据的快照。

Flink中 WaterMark 和 Window 机制解决了流式数据的乱序问题,对于因为延迟而顺序有误的数据,可以根据eventTime进行业务处理,对于延迟的数据Flink也有自己的解决办法,主要的办法是给定一个允许延迟的时间,在该时间范围内仍可以接受处理延迟数据

保存延迟数据则是通过sideOutputLateData(outputTag: OutputTag[T])保存

Flink可以完全独立于Hadoop,在不依赖Hadoop组件下运行。但是做为大数据的基础设施,Hadoop体系是任何大数据框架都绕不过去的。Flink可以集成众多Hadooop 组件,例如Yarn、Hbase、HDFS等等。例如,Flink可以和Yarn集成做资源调度,也可以读写HDFS,或者利用HDFS做检查点。

有以下三个角色:

JobManager处理器:

也称之为Master,用于协调分布式执行,它们用来调度task,协调检查点,协调失败时恢复等。Flink运行时至少存在一个master处理器,如果配置高可用模式则会存在多个master处理器,它们其中有一个是leader,而其他的都是standby。

TaskManager处理器:

也称之为Worker,用于执行一个dataflow的task(或者特殊的subtask)、数据缓冲和data stream的交换,Flink运行时至少会存在一个worker处理器。

Clint客户端:

Client是Flink程序提交的客户端,当用户提交一个Flink程序时,会首先创建一个Client,该Client首先会对用户提交的Flink程序进行预处理,并提交到Flink集群中处理,所以Client需要从用户提交的Flink程序配置中获取JobManager的地址,并建立到JobManager的连接,将Flink Job提交给JobManager

在Flink中每个TaskManager是一个JVM的进程, 可以在不同的线程中执行一个或多个子任务。 为了控制一个worker能接收多少个task。worker通过task slot(任务槽)来进行控制(一个worker至少有一个task slot)。

Flink支持不同的重启策略,这些重启策略控制着job失败后如何重启:

固定延迟重启策略会尝试一个给定的次数来重启Job,如果超过了最大的重启次数,Job最终将失败。在连续的两次重启尝试之间,重启策略会等待一个固定的时间。

失败率重启策略在Job失败后会重启,但是超过失败率后,Job会最终被认定失败。在两个连续的重启尝试之间,重启策略会等待一个固定的时间。

Job直接失败,不会尝试进行重启。

Flink通过实现两阶段提交和状态保存来实现端到端的一致性语义。分为以下几个步骤:

开始事务(beginTransaction)创建一个临时文件夹,来写把数据写入到这个文件夹里面

预提交(preCommit)将内存中缓存的数据写入文件并关闭

正式提交(commit)将之前写完的临时文件放入目标目录下。这代表着最终的数据会有一些延迟

丢弃(abort)丢弃临时文件

若失败发生在预提交成功后,正式提交前。可以根据状态来提交预提交的数据,也可删除预提交的数据。

端到端的exactly-once对sink要求比较高,具体实现主要有幂等写入和事务性写入两种方式。

幂等写入的场景依赖于业务逻辑,更常见的是用事务性写入。而事务性写入又有预写日志(WAL)和两阶段提交(2PC)两种方式。

如果外部系统不支持事务,那么可以用预写日志的方式,把结果数据先当成状态保存,然后在收到 checkpoint 完成的通知时,一次性写入 sink 系统。

Flink 内部是基于 producer-consumer 模型来进行消息传递的,Flink的反压设计也是基于这个模型。Flink 使用了高效有界的分布式阻塞队列,就像 Java 通用的阻塞队列(BlockingQueue)一样。下游消费者消费变慢,上游就会受到阻塞。

Flink在做计算的过程中经常需要存储中间状态,来避免数据丢失和状态恢复。选择的状态存储策略不同,会影响状态持久化如何和 checkpoint 交互。Flink提供了三种状态存储方式:MemoryStateBackend、FsStateBackend、RocksDBStateBackend。

这道题问的比较开阔,如果知道Flink底层原理,可以详细说说,如果不是很了解,就直接简单一句话:Flink的开发者认为批处理是流处理的一种特殊情况。批处理是有限的流处理。Flink 使用一个引擎支持了DataSet API 和 DataStream API。

Flink 并不是将大量对象存在堆上,而是将对象都序列化到一个预分配的内存块上。此外,Flink大量的使用了堆外内存。如果需要处理的数据超出了内存限制,则会将部分数据存储到硬盘上。Flink 为了直接操作二进制数据实现了自己的序列化框架。

在流式处理中,CEP 当然是要支持 EventTime 的,那么相对应的也要支持数据的迟到现象,也就是watermark的处理逻辑。CEP对未匹配成功的事件序列的处理,和迟到数据是类似的。在 Flink CEP的处理逻辑中,状态没有满足的和迟到的数据,都会存储在一个Map数据结构中,也就是说,如果我们限定判断事件序列的时长为5分钟,那么内存中就会存储5分钟的数据,这在我看来,也是对内存的极大损伤之一。

数仓建设中,最重要的是数据准确性,数据的真正价值在于数据驱动决策,通过数据指导运营,在一个不准确的数据驱动下,得到的一定是错误的数据分析,影响的是公司的业务发展决策,最终导致公司的策略调控失败。

单表数据量监控

一张表的记录数在一个已知的范围内,或者上下浮动不会超过某个阈值

单表空值检测

某个字段为空的记录数在一个范围内,或者占总量的百分比在某个阈值范围内

单表重复值检测

一个或多个字段是否满足某些规则

跨表数据量对比

主要针对同步流程,监控两张表的数据量是否一致

数据商业分析的目标是利用大数据为所有职场人员做出迅捷,高质,高效的决策提供可规模化的解决方案。商业分析是创造价值的数据科学。

数据商业分析中会存在很多判断:

比如想知道线上渠道A、B各自带来了多少流量,新上线的产品有多少用户喜欢,新注册流中注册的人数有多少。这些都需要通过数据来展示结果。

我们需要知道渠道A为什么比渠道B好,这些是要通过数据去发现的。也许某个关键字带来的流量转化率比其他都要低,这时可以通过信息、知识、数据沉淀出发生的原因是什么。

在对渠道A、B有了判断之后,根据以往的知识预测未来会发生什么。在投放渠道C、D的时候,猜测渠道C比渠道D好,当上线新的注册流、新的优化,可以知道哪一个节点比较容易出问题,这些都是通过数据进行预测的过程。

所有工作中最有意义的还是商业决策,通过数据来判断应该做什么。这是商业分析最终的目的。

THE END
0.数据倾斜常见原因和解决办法数据倾斜的原因及解决办法数据倾斜常见原因和解决办法 数据倾斜在MapReduce编程模型中十分常见,多个节点并行计算,如果分配的不均,就会导致长尾问题(大部分节点都完成了任务,一直等待剩下的节点完成任务),本文梳理了常见的发生倾斜的原因以及相应的解决办法。 1.map端发生数据倾斜 产生原因:jvzquC41dnuh0lxfp0tfv8okcpmiwjnlkg5bt}neng5eg}fknu524;6668<3
1.2022年最强大数据面试宝典(全文50000字,强烈建议收藏)4. 热点现象(数据倾斜)怎么产生的,以及解决方法有哪些 热点现象: 某个小的时段内,对HBase的读写请求集中到极少数的Region上,导致这些region所在的RegionServer处理请求量骤增,负载量明显偏大,而其他的RgionServer明显空闲。 热点现象出现的原因: HBase中的行是按照rowkey的字典顺序排序的,这种设计优化了scan操作,可以jvzquC41dnuh0ryrwd4og}4922776A71xkkxuyfeg/8:3::371
2.2022年最强大数据面试宝典(全文50000字,建议收藏)(四)4. 热点现象(数据倾斜)怎么产生的,以及解决方法有哪些 热点现象: 某个小的时段内,对HBase的读写请求集中到极少数的Region上,导致这些region所在的RegionServer处理请求量骤增,负载量明显偏大,而其他的RgionServer明显空闲。 热点现象出现的原因: HBase中的行是按照rowkey的字典顺序排序的,这种设计优化了scan操作,可以jvzquC41fg|fnxugt0gmk‚zp0eun1jwvkerf1B5386:
3.HBase知识手册爱是与世界屏的技术博客Hbase和Hive在大数据架构中处在不同位置,Hbase主要解决实时数据查询问题,Hive主要解决海量数据处理和计算问题,一般是配合使用。 Hbase:Hadoop database 的简称,也就是基于Hadoop数据库,是一种NoSQL数据库,主要适用于海量明细数据(十亿、百亿)的随机实时查询,如日志明细、交易清单、轨迹行为等。 jvzquC41dnuh0>6evq4dqv4nqxkcg}ygtyusnm47;8?45B
4.大数据工程师面试题这一篇就够用了fsimage:记录的是数据块的位置信息、数据块的冗余信息(二进制文件) 由于edits 文件记录了最新状态信息,并且随着操作越多,edits 文件就会越大,把 edits 文件中最新的信息写到 fsimage 文件中就解决了 edits 文件数量多不方便管理的情况。 没有体现 HDFS 的最新状态。 jvzquC41yy}/lrfpuj{/exr1r1:3e<;cg37e7B
5.如何避免数据倾斜数据处理中数据倾斜和数据热点1、数据倾斜的表现 数据倾斜是由于数据分布不均匀,造成数据大量的集中到一点,造成数据热点的现象。 主要表现:任务进度长时间维持在 99%或者 100%的附近,查看任务监控页面,发现只有少量 reduce 子任务未完成,因为其处理的数据量和其他的 reduce 差异过大。 单一 reduce 处理的记录数和平均记录数相差太大,通常达到好jvzquC41dnuh0lxfp0tfv8|cfljlftillf5bt}neng5eg}fknu526<:9:374
6.分库分表方案中出现数据倾斜问题怎么解决分库分表数据倾斜二、 解决方案 1. 调整分片策略(治本之法) 2. 处理业务热点(治标之法) 3. 其他策略 三、 预防措施 总结 这是一个在分库分表实践中非常经典且棘手的问题。数据倾斜意味着数据并没有均匀地分布在不同数据库或表中,导致某些节点负载过高(存储、CPU、IO),而其他节点却非常空闲,从而成为系统瓶颈,严重影响整体性jvzquC41dnuh0lxfp0tfv8skgvgpl~s1ctzjeuj1fgzbkux137734=684
7.Hadoop·大数据技术栈·看云6.Hadoop解决数据倾斜方法 1、提前在 map 进行 combine,减少传输的数据量 2、导致数据倾斜的 key 加盐、提升 Reducer 并行度 … 7.Hadoop读文件和写文件流程 Hadoop读文件和写文件流程 HDFS-文件读写流程 8.Yarn的Job提交流程 步骤很多,理理清楚然后再有条理的进行回答。 jvzquC41yy}/mjsenq{e0ls1jcvq{ltfg/zpinyjgt5ckpicvc<03?<7746
8.37数据分布优化:如何应对数据倾斜?Redis核心技术与实战如使用配置更高的机器 只适用于只读的热点数据 解决方法 解决方法 解决方法 热点数据多副本 实例上存在热点数据 使用Hash Tag导致倾斜 Slot分配不均衡导致倾斜 bigkey导致倾斜 在构建切片集群时,尽量使用大小配置相同的实例,避免因实例资源不均衡而在不同实例上分配不同数量的Slot 应对方法 成因 成因 小建议 Redis jvzquC41vksf0pjgmdgoi7tti1ipn~rp1cxuklqg15695B8
9.举例说明Spark数据倾斜有哪些场景,对应的解决方案是什么?增加并行度,通过spark.sql.shuffle.partitions设置更高的 Shuffle 分区数,分散热点数据 [^1]。 场景三:Reduce Side Join 导致的倾斜 在没有合适优化手段的情况下,Join 操作只能在 Reduce 阶段完成,容易引发数据倾斜。 解决方法: 尝试将 Reduce Side Join 转换为 Map Side Join,前提是至少有一方数据量较小且可广jvzquC41ygtlw7hufp4og}4cpu}ft86jx|nv6?hy
10.阿里P8整理总结,入职大厂必备Java核心知识(附加面试题)ArrayList和LinkedList区别?HashMap内部数据结构?ConcurrentHashMap分段锁?jdk1.8中,对hashMap和concurrentHashMap做了哪些优化如何解决hash冲突的,以及如果冲突了,怎么在hash表中找到目标值synchronized 和 ReentranLock的区别?ThreadLocal?应用场景?Java GC机制?GC Roots有哪些?MySQL行锁是否会有死锁的情况?jvzquC41oconcr3ep1gsvrhng1jfvjnnAhoe?:<655955><(ghoe?|Tw|Q|yq@Gvec>Co95\mjG
11.新老手都值得看的Flink关键技术解析与优化实战上图为计算最小值的热点问题,红色数据为热点数据。如果直接将它们打到同一个分区,会出现性能问题。为了解决倾斜问题,我们通过 hash 策略将数据分成小的 partition 来计算,如上图的预计算,最后再将中间结果汇总计算。 当一切就绪后,我们来做增量的 UV 计算,比如计算 1 天 uv,每分钟输出 1 次结果。计算方式既可jvzquC41yy}/kwkqs0io1jwvkerf1\OTIQjDj{:PuHGGXPojZ
12.Redis数据库的数据倾斜详解Redis在服务端读数据访问Redis时,往往会对请求key进行分片计算,此时中会将请求打到某一台 Server 上,如果热点过于集中,热点 Key 的缓存过多,访问量超过 Server 极限时,就会出现缓存分片服务被打垮现象的产生。当缓存服务崩溃后,此时再有请求产生,就会打到DB 上,这也就是我们常说的缓存穿透,如果没有合理的解决,数据库jvzquC41yy}/lk:30pku1mfvcdgtg87;45;53}v0jvs
13.解决Redis数据倾斜热点等问题Redis单台机器的硬件配置有上限制约,一般我们会采用分布式架构将多台机器组成一个集群,这篇文章主要介绍了解决 Redis 数据倾斜、热点等问题,需要的朋友可以参考下+ 目录 GPT4.0+Midjourney绘画+国内大模型 会员永久免费使用!【 如果你想靠AI翻身,你先需要一个靠谱的工具!】 Redis 作为一门主流技术,应用场景非常多,很多jvzquC41yy}/lk:30pku1jwvkerf1;;;:9=/j}r
14.HBasehbase每秒最大写入多少5 热点现象( 数据倾斜) 怎么产生的, 以及解决方法有哪些 5.1热点现象   某个小的时段内, 对 HBase 的读写请求集中到极少数的 Region 上, 导致这些region 所在的 RegionServer 处理请求量骤增, 负载量明显偏大, 而其他的RgionServer 明显空闲。 jvzquC41dnuh0lxfp0tfv8vsa6676974:1gsvrhng1jfvjnnu173:;;677<
15.Hbase基本概念比如创建一张表,名为user,有两个列族,分别是userInfo和addressInfo,建表语句create 'user', 'userInfo', 'addressInfo' 3.Timestamp(时间戳):纪录每次操作数据的时间,通常作为数据的版本号 六. 热点现象(数据倾斜)怎么产生的,以及解决方法有哪些 热点现象: jvzquC41dnuh0lxfp0tfv8qkdcuxgw;2;1gsvrhng1jfvjnnu1738?<6987