轻易云数据集成平台:源平台调度者生命周期详解
轻易云数据集成平台为企业提供了一套完整的数据处理流程,包括数据抽取、清洗、转换及转发等关键步骤。本文深入探讨平台中“数据抽取”阶段的核心组件——源平台调度者的生命周期管理及操作流程,确保开发工程师能够充分理解并有效地实施数据集成方案。
定时任务调度
调度者分配与配置
轻易云平台预设了20个调度者(dispatcher-0至dispatcher-19),通过Linux crontab计划任务实现每分钟一次的调度命令执行。在数据集成方案的配置页面中,用户可指定“调度号”以分配特定调度者,这样可以优化性能,防止单个调度者的任务阻塞。
调度命令执行
每个调度者将执行以下命令,根据分配的“调度号”识别并启动相应的集成方案:
cd ./dispatcher && php dispatcher-[0~19] schedule:run
集成方案遍历与调度命令生成
异步方案识别
在遍历集成方案时,首先排除非异步方案,这些通常由事件触发或外部系统激活。
调度命令创建
对于需要调度的方案,将创建一个或多个调度命令,例如:
dispatch:datahub [task_id] --source --asyn
以及对应目标平台的调度命令。这些命令将根据crontab延迟执行。
补漏命令生成
如果集成方案配置了补漏措施,则会创建一个特殊的补漏命令,例如:
dispatch:datahub [task_id] --source --asyn --omission
按照配置的补漏crontab执行。
调度命令执行与队列管理
调度命令激活
到达指定时间点后,创建的调度命令将被激活,转化为特定格式并放入AsynDispatcher队列池中排队,等待处理。
队列池任务消费
AsynDispatcher队列池将对排队的任务进行处理,包括再次确认任务启动条件、检查调度条件是否满足,以及执行具体的调度工作。
适配器加载与任务调度
适配器加载
调度过程中首先加载集成方案配置的源平台适配器。
任务调度执行
适配器初始化后,执行以下方法,负责完成具体的调度任务:
$adapter->dispatch()
包括插入调度日志、生成任务请求参数、将新任务参数写入任务存储、将任务ID插入源任务队列池进行排队,最终插入调度结束日志。
异步队列池任务消费
异步队列池(AsynDispatcher队列池)的任务消费过程是数据抽取阶段的核心。以下步骤详细描述了从任务验证到执行的流程:
任务验证与条件检查
任务状态确认
消费任务前,首先确认任务是否处于启动状态,并检查是否满足调度条件。这包括验证是否有前置任务正在执行,以确保调度的顺序性和依赖性。
调度条件满足后的处理
一旦确认任务满足所有调度条件,系统将调用以下方法来执行具体的调度工作:
Instance::handleSourceDispatch()
适配器操作与任务执行
适配器加载与初始化
系统将加载指定的源平台适配器,并初始化适配器参数。
执行适配器调度方法
初始化后,适配器的以下方法被调用:
dispatch()
执行包括插入调度开始日志、生成任务请求参数、将请求参数写入异步源任务存储、将任务ID插入源任务队列池、插入调度结束日志等步骤。
数据抽取与处理
适配器完成任务调度后,接下来的步骤涉及数据的实际抽取和处理:
数据抽取任务的执行
任务详情获取
根据任务ID,从异步源任务存储中获取任务的详细信息,包括请求参数等。
适配器连接与执行
适配器首先尝试连接到源平台,确保连接成功后,执行具体的数据请求操作,通常通过调用以下方法完成:
invoke()
数据处理与转换
响应数据处理
任务执行后,适配器处理源平台的响应数据,包括检查响应状态、处理成功或失败的响应、以及对成功响应的数据进行进一步处理。
数据加工与转换
在适配器处理响应数据后,可能会触发脚本加工厂的调度方法,对数据进行加工和转换,以满足集成方案的需求。
任务状态管理与日志记录
任务状态更新
任务完成状态标记
完成数据处理后,任务将被标记为“已完成”状态,确保系统正确跟踪任务的执行结果。
错误处理与重排
如果任务执行失败,系统将执行错误处理流程,包括异常记录和判断是否需要任务重新排队。
日志记录
调度日志
从调度开始到结束,系统将记录详细的调度日志,包括任务开始、结束时间,以及任何重要的状态变更信息。
适配器日志
适配器操作过程中产生的日志也被记录,包括连接状态、数据处理结果等,以便于问题诊断和性能优化。
源平台事件关联与触发
最后,轻易云数据集成平台支持源平台事件关联,允许一个集成方案的执行触发其他方案的调度:
事件关联配置
源平台事件关联
在某些场景中,一个集成方案的成功执行可能需要触发另一个方案的调度。这通过配置源平台事件关联来实现。
后续方案的调度触发
当一个方案成功完成后,系统将根据配置的事件关联检查是否需要触发其他集成方案的调度,确保数据集成流程的连续性和自动化。
异步队列池中任务的具体执行过程
任务的获取与验证
获取任务详情
对于异步队列池中的每个任务,系统首先通过任务ID从以下方法获取任务的具体详情,包括执行所需的所有参数:
getAsynSourceJobStorage()
任务状态检查
系统接着检查任务的当前状态。如果任务已经标记为错误或已完成,系统将不会继续执行该任务,并将其从队列中移除。