原创 轻易云数据集成平台的数据存储机制与实现

发布时间:
更新时间:
浏览次数:1011
评论数:0

轻易云数据集成平台采用了一种高效的数据存储机制,专门为支持数据集成方案设计。核心思想是为每个租户分配一个独立的MongoDB数据库,并在其中为每个集成方案创建独立的数据集合(Collection),以确保数据隔离和安全。

数据接收和预处理是数据集成过程中的关键环节。当一个数据集成任务启动时,平台首先从源系统接收数据,例如以下JSON格式的数据:

{
    "code": 0,
    "message": "",
    "trades": [
        {"rec_id": "1", "shop_no": "xyp2test", "tid": "115580935277840368-1", "logistics_type": "12"},
        {"rec_id": "2", "shop_no": "115580935277840368-2", "tid": "test0002", "logistics_type": "12"}
    ]
}

此时,源平台适配器解析接收到的数据,提取trades数组中的元素,并将它们逐一写入轻易云数据集成平台的存储系统(ADATA)。写入过程中,可以配置特定字段如idnumber等,以支持后续的数据处理和分析。

处理复杂的数据结构是另一挑战。平台能够处理具有多级数据结构的情况。例如,以下JSON展示了复杂的多级数据结构:

{
    "code": 0,
    "message": "",
    "total_count": 1,
    "trades": [
        {
            "trade_id": "168498",
            "trade_no": "JY201905180001",
            "platform_id": "0",
            "warehouse_type": "1",
            "src_tids": "115580935277840368-1",
            "pay_account": "默认账户",
            "trade_status": "30",
            "goods_list": [
                {"rec_id": "2944911", "trade_id": "168498", "spec_id": "1", "num": "2.0000", "price": "99.0000", "paid": "198.0000", "goods_name": "旺店通手持终端", "goods_id": "1", "goods_no": "WDTPDA"},
                {"rec_id": "2944912", "trade_id": "168498", "spec_id": "1", "num": "2.0000", "price": "99.0000", "paid": "198.0000", "goods_name": "旺店通手持终端", "goods_id": "1", "goods_no": "WDTPDA"}
            ]
        }
    ]
}

默认情况下,平台会遍历trades数组,将其每个对象作为独立单元进行存储。对于更复杂的结构,如goods_list数组,平台提供了拍扁功能,将嵌套结构拍扁,转化为独立的记录行,以满足不同的数据分析需求。

核心数据存储(ADATA)

每个集成方案的核心数据存储在一个名为{{方案id}}_ADATA的Collection中,包含以下信息:

{
    "_id": "65228287911dac2fbd2e551e",
    "id": "CRK2023100719403_1",
    "number": "CRK2023100719403_1",
    "content": {},
    "status": 0,
    "relation_id": null,
    "response": null,
    "response_at": 0,
    "created_at": 0.0,
    "source_job_id": "6522777d007a976b6c36d34b",
    "target_job_id": null,
    "dispatch_begin": 0,
    "dispatch_end": 0,
    "dispatch_time": 0,
    "result": []
}

这些字段包括:

  • _id: 数据记录的唯一标识符。
  • id: 业务主键ID。
  • number: 业务编号。
  • content: 业务数据内容。
  • status: 数据状态码,例如0(等待处理),1(重复),2(已完成),3(错误),5(队列中),6(跳过),7(不处理)。
  • relation_id: 关联的其他数据记录ID。
  • response: 目标平台的响应内容。
  • response_at: 响应时间戳。
  • created_at: 数据创建时间戳。
  • source_job_id: 源平台任务唯一标识符。
  • target_job_id: 目标平台任务唯一标识符。
  • dispatch_begin: 分派处理开始时间戳。
  • dispatch_end: 分派处理结束时间戳。
  • dispatch_time: 分派处理所花费的时间。
  • result: 存储处理结果的数组。

运行日志存储(LOG)

为了记录执行细节和状态,{{方案id}}_LOG Collection用于存储运行日志:

{
    "_id":"6516600ef94af3187d5efd2c",
    "text":"00_debug_建立连接",
    "content":{},
    "status":4,
    "created_at":0.0
}

这些字段包括:

  • _id: 日志记录唯一标识符。
  • text: 日志文本信息,如“建立连接”。
  • content: 日志详细内容。
  • status: 日志级别,例如0(记录),1(警告),等等。

通过这种结构化的数据存储设计,轻易云数据集成平台能够高效、准确地处理和存储各种复杂的数据集成需求。