大数据量处理思路

一、整体思路

一句话:尽可能的多worker去干!

如果worker是线程,那就是多线程

如果worker是JVM进程,那就是集群、分布式

如果worker是数据库,那就是读写分离

如果worker是库,那就是分库技术

如果worker是表,那就是分表技术

大家回想初中降低噪音的措施有哪些:减少污染源,增加传播介质,减少去污染源多的区域。就是从声源发出,传播,接收三个方面去优化。那数据不也是数据源来源数据处理数据落地三个方面吗?

那么我们就从以下三个方面去着重优化

  • 数据来源

  • 数据处理

  • 数据落地

二、数据来源

大数据量场景下的数据来源主要有大的两个方面:第三方数据来源本地数据来源,这不是扯废话吗?数据要么是别人的,要么是自己的,就这两种情况。

第三方数据来源的处理方式要么是推(push)要么是拉(pull)

  • 推(push):推的方式开发中经常见到,要么别人推送到你的http接口里面,例如微信公众号有人关注和取消关注,支付成功的回调事件,但是这种一般在对接大型公司的时候才有;要么就是让你订阅他们的消息队列,他们会推送到消息队列中,订阅了就可以接收到了,但是这种情况比较少。推的情况一般是推送变更数据给你。

  • 拉(pull):拉的方式在三方数据对接的时候是最常见的,对于提供方比较方便,别人写好一个接口或者开放一个视图,你自己对接的时候就自己去拉取吧。

无论是哪种数据来源,都有一个重要的问题需要考虑,就是第一次数据处理后续数据处理方案。因为第一次一般是将整个数据搞过来,后续的数据处理我们往往都只需要处理变更的数据。如果提供方正好是支持拉所有推变更,那这个数据提供方是真好。我们在对接过程中就可以省很多事情了。但是大部分情况就是只有一个拉的接口给你,或者再多一个详情接口。

我们就不讨论,第三方将变更数据通过推的方式了,这种对接比较简单。着重说下拉的方式,批量推的也可以参考拉的方式。

拉的方式我们着重以http接口形式说明:

调用别人http接口,一般是个列表接口,还是分页的,需要我们传入分页参数,分页去获取。

  • JVM调优:一般处理大数据量,可以适当调整jvm参数,避免内存不足,频繁FULL_GC,可以调整堆内存新生代元空间的内存,此处不细讲。

  • 多线程:多个线程去获取,高效,但是容易出现线程安全问题。一般都是推荐使用线程池的方式来创建多线程,尽量使用ThreadPoolExecutor构造器手动创建线程池,避免任务队列过长引起OOM,另外也要避免父子任务共用线程池的情况,如果父任务把线程用完了,在父任务中调用子任务的时候可能会因为没有可用线程引起死锁问题。

    线程安全问题:我们常用的ArrayListHashSetHashMap这时候就不能使用了,天天背面试题这个线程不安全,那个线程不安全,这时候就是排上用场的时候了

    • ArrayList可以替换成CopyOnWriteArrayListCopyOnWriteArrayList适用于读多写少的场景,性能远远好于 Vector

    • 对于Map结构,可以使用Collections.synchronizedMap()该方法只是在Map的相关方法上封装了一下,加了个synchronized修饰,所以建议使用ConcurrentHashMapConcurrentHashMap底层实现是synchronizedCAS自旋的方,可以在线程安全的基础上保证性能。

    • 对于公共变量的要用volatile关键字修饰,一些数量的变化也要用原子类去操作。

    • 某些变量的定义位置要合理,例如有些iffor大括号中的变量,因为定义不合理,就会引起GC无法回收当前变量,导致内存泄漏。

  • 多实例和多线程:我们可以让一个jvm进程利用多线程去处理,但是一个jvm进程多线程数量毕竟有限,当然我们可以无限加内存和CPU资源,来增加线程数量,但是往往硬件资源增加过多,带来的价格提升往往是指数倍的。我们可以用多台实例,每台实例都用多线程处理数据。

    多实例和多线程的分工问题:对于单台jvm进程,我们可以用多线程让每个线程去访问不同的页数,从而加快访问速度。在这个场景下,我们的代码是可以控制的,在多实例的情况下,每个实例的代码是一样的,我们怎么控制不同的jvm进程拿到不同的数据呢?

    那就要用到分片广播的功能,我们让不同的jvm进程拿到不同的页的范围,例如有100w数据,两个jvm进程,我们可以让第一台拿到奇数页去处理,另一台拿到偶数页数据去处理,这样就相当于每台jvm进程只处理50w的数据,效率肯定远远高于单台处理100w的数据的。

    对于分片广播多个JVM进程自己是没办法处理,必须要有个协调者来给他们分配任务让他们去执行。可以使用第三方定时任务组件,例如xxl_job处理分片,例如一共四台机器,执行定时任务的时候,组件会调用四台机器同时执行定时任务,然后传入当前的分片索引分片总数,这样就相当于四个人干活了,还有个问题需要注意,如何确保每个机器干活不重复呢?通常的做法是:每台机器只处理(A % 分片总数==分片索引)的数据即可,A通常是该条数据的序号(在总数据量下的序号)

上面只说明了在首次数据全量获取的时候的方案,还有重要的一部分就是增量获取问题:

增量获取的内容是新增的数据修改的数据,要看第三方提供增量数据的方式

  • 主动推送增量数据:当数据变更时对方主动推送增量数据,这种处理就比较方便,直接处理即可,这种情况更新的数据双方系统具有很好的实时性。

  • 提供了支持时间查询的接口:这个时候我们只需要根据时间查询即可,然后定时去查询最近时间段新增和修改的即可,双方数据有一点的延时性。

    数据漏查的问题:出现数据漏查的现象是,一般我们数据查询是按照秒级查询的,一秒中就可能产生多条数据,但是我们查询的时刻,可能这一秒中有的数据还没生成,下次查询的时候,我们就不查询这个时间点了,那么这些数据就会漏掉。

    解决方案:在增量查询过程中,多查询几秒,来确保数据的完整性,多查询就会意味着,我们查询的数据会出现重复数据,我们就要在程序上解决是否有重复数据,数据重复比数据缺少更好处理。

  • 只有一个列表接口:这种就是最恶心的场景,第三方就是懒省事,把锅甩给后人。遇到这种情况只能每次去全量查询,然后在自己这边进行比对,这里提供几种方式,还有其他的欢迎留言

    • 删了重新写入:这种方式比较粗暴。拿到对方的数据,先根据对方数据的唯一标识在我们这边查询,有的话删除掉,然后执行插入,新增的话直接插入即可,当然如果业务情况允许可以直接删除整张表,然后再重新插入。

    • 增加临时表:如果业务不允许的情况下,我们可以增加临时表,处理完成之后,将结果更新到目标表即可。

    • 增加一列冗余字段存放hash值:类似HashMap的hash算法,将数据转化为一个hash值,首先根据对方的唯一主键查看是否是否存在,不存在就插入,存在的话计算hash值,与数据的计算好的hash比较是否相同,但是这种情况可能会存在hash冲突的现象。

    • 还有一种就是查出来,挨个字段比较一遍,是否有变更数据,这种情况很耗性能。

还有几点需要注意

  • 三方接口限流:我记得刚入行的时候,调用钉钉通讯录同步接口,凌晨调用有时候成功,有时候失败,我就纳闷了好久,最后它的文档最后有一句小字,单接口每秒只能调用300次,不是针对某家企业的限流,是所有企业共同争夺这300次机会,这就很坑了基本上就是2点,3点去同步,谁搞个2:06去同步呀,这时候就要有失败重试措施了。

  • 三方接口限制条数:有些接口对分页条数会有限制,例如单接口最大只返回1000条数据,避免你直接来个Integer.MAX_VALUE循环调用给人家干蹦了。

  • 网络波动问题:网络波动,可能引起接口响应超时。

    • 如果是读取超时,我们重试几次即可,如果重试仍然失败,一般做法是把这些错误的数据记录下来,后面统一再次重试,这里可以参考微信支付会调的场景,每隔如下时间进行重试(15s/15s/30s/3m/10m/20m/30m/30m/30m/60m/3h/3h/3h/6h/6h),如果还是不成功,那就要靠人工介入了。

    • 如果是写入对方时超时,就需要对方提供接口,查询一下是否写入成功了,避免二次写入的问题。当然对方接口应该也会加上重复数据的判断。

拉取的另一种方式是从别人的数据库里面获取:

从别人数据库里面获取,一般都是给你个视图,让你自己调用,这时候一般也是分页去获取,具体的方案和http调用基本一致,只不过一个是网络中获取,一个是数据库查询。

对接视图也需要注意:

  • 数据视图限流:虽然数据库层面不会直接限制调用次数,但是提供方也担心有老六,疯狂轮询他们数据库,因为你这个三方的视图拖垮了他们整个服务,这时候也要向他们确认一下是不是有限流的措施,例如人家搞个触发器之类的限制你的调用频率和次数。

三、数据处理

数据处理没什么特别说明的地方,还是多实例多线程多用缓存去操作数据,一个人干的猛,一群人都要这么猛的干!

四、数据落地

数据落地其实就是和数据库打交道的阶段了,还是遵循一次要干的多干的猛,一群人这样干的思想。

主要从以下几个方面去考虑:

  • 物理优化:一个数据库猛干或者一群数据库猛干

    • 增加单个数据库性能(一个数据库猛干)

    • 读写分离(一群库猛干)

  • 优化数据库系统:以MySQL为例,在MySQL数据库系统中,很多参数都是默认的,但是在大数据量的情况下,就会捉襟见肘了。可以增大缓冲池大小增大查询缓存大小修改日志文件写入时机增大事务缓冲区大小等。

  • 优化表结构:可以进行数据分片Sharding Sphere),把一个表拆成多个表,每张表的压力就小了,也可以水平拆分或者横向拆分。

  • 优化SQL:添加sql索引避免回表操作。一次回表操作带来的损耗不明显,大数据量下带来的就很明显。

  • 批量插入/更新:进行sql拼接,减少与数据库连接次数。

  • 多线程:执行写入或者更新的时候,代码层面也是可以多线程批量写入的

五、异常数据处理

在数据处理过程中,可能会异常情况

  1. 脏数据处理报错

  2. 网络波动

  3. 对方服务宕机

  4. 己方服务宕机

无论是什么原因引起的中断,都要围绕两个问题:异常数据怎么办?要不要继续下去?

异常数据有可能是脏数据,也有可能是处理一半的数据

  • 对于脏数据,我们的程序一般就是无法处理了,这种数据需要保存下来,人工处理。

  • 处理一半的数据,这时候我们可以进行分段处理(可以参考MySQL redo_log的二阶段提交),数据处理之前,我们可以先将操作变量存入redis中,然后进行数据处理,开始进行下一段数据处理的时候,删除上一段的操作变量,如果中途失败,我们可以根据redis的结果进行纠正,然后继续从复活点继续向后处理。当然也可以在数据库中增加一个状态字段,用于判断写入的状态。

大数据量处理,要不要事务?

对于大数据量,可以使用事务,但是要少用,大量事务堆积,也会拖垮整个数据库。不过也要根据实际业务情况判断,不用事务就要自己处理事务,或者自定义事务范围,多少次事务之后提交数据。