From 5535a2e21cdc6be018d86f23fee8d1fb5d3c15c1 Mon Sep 17 00:00:00 2001 From: gowthaman Date: Sat, 17 Jun 2023 23:17:25 +0530 Subject: [PATCH] auto schema generation --- main.py | 120 ++++++++++++++++++++++++++++++++-------------------- schema.json | 55 ------------------------ 2 files changed, 74 insertions(+), 101 deletions(-) delete mode 100644 schema.json diff --git a/main.py b/main.py index 8a937f8..cf4f232 100644 --- a/main.py +++ b/main.py @@ -10,7 +10,6 @@ import argparse parser = argparse.ArgumentParser() parser.add_argument("--input", help="folder containing input json(s)", required=True, type=pathlib.Path) parser.add_argument("--output", help="folder to place csv", required=True, type=pathlib.Path) -parser.add_argument("--schema", help="schema json", required=True, type=pathlib.Path) parser.add_argument("--single", action="store_true", help="merge all json files to single output csv") parser.add_argument("--metadata", type=int, help="how many records to parse for building metadata", default=1000) parser.add_argument("--join-column", help="join column from top-level to merge nested json", required=True) @@ -18,8 +17,6 @@ parser.add_argument("--name", help="join column from top-level to merge nested j args = parser.parse_args() -schema = json.load(open(args.schema)) - class DBConn: def __init__(self): @@ -73,25 +70,42 @@ class DBConn: dbConn = DBConn() -def extract_child(merge_headers, item, k, ext, prev_step): - child_headers = merge_headers.get(k, [args.join_column]) +def extract_child(merge_headers, items_list, top_level, prev_step, existing, step): + headers = merge_headers.get(top_level, [args.join_column]) - for child in item[k]: + flat_keys = existing.get("flat_keys", []) + extract_keys = existing.get("extract_keys", {}) + + for child_idx in range(len(items_list)): + child = items_list[child_idx] for (subKey, subValue) in child.items(): - is_ext = subKey in ext.get("extract_keys", {}).keys() - if is_ext: - extract_child(merge_headers, child, subKey, ext["extract_keys"][subKey], f"{prev_step}{k}_") - else: - child_header = f"{k}_{subKey}" - if child_header not in child_headers: - child_headers.append(child_header) - merge_headers[f"{prev_step}{k}"] = child_headers + is_dict = isinstance(subValue, dict) + is_list = isinstance(subValue, list) + + if is_dict: + if subKey not in flat_keys: + flat_keys.append(subKey) + for (sub2Key, sub2Value) in subValue.items(): + composite_key = f"{top_level}_{sub2Key}" + if composite_key not in headers: + headers.append(composite_key) + if is_list: + if subKey not in extract_keys.keys(): + existing_next_step = extract_keys.get(subKey, {}) + extract_keys[subKey] = extract_child(merge_headers, subValue, subKey, f"{prev_step}{top_level}_", + existing_next_step, step + 1) + else: + child_header = f"{top_level}_{subKey}" + if child_header not in headers: + headers.append(child_header) + + merge_headers[f"{prev_step}{top_level}"] = headers + return {"flat_keys": flat_keys, "extract_keys": extract_keys} def extract_child_value(merge_headers, item, k, ext, prev_step, join_col_value): - child_value = {} - child_value[args.join_column] = join_col_value + child_value = {args.join_column: join_col_value} for child in item[k]: for (subKey, subValue) in child.items(): is_ext = subKey in ext.get("extract_keys", {}).keys() @@ -107,10 +121,9 @@ def extract_child_value(merge_headers, item, k, ext, prev_step, join_col_value): def parse_json(): - extract_keys = schema["extract_keys"] - flat_keys = schema["flat_keys"] + extract_keys = {} + flat_keys = [] - extract_keys_names = extract_keys.keys() headers = [] merge_headers = {} @@ -124,56 +137,71 @@ def parse_json(): return parsed = 0 - parent = args.name + top_level = args.name + # first pass, collect all headers - for item in ijson.items(open(input_path / input_files[0]), "item"): + for topLevelItem in ijson.items(open(input_path / input_files[0]), "item"): if parsed > args.metadata: print(f"parsed {parsed} records for metadata") break - keys = item.keys() - for k in keys: - if k in flat_keys: - for (fk, fv) in item[k].items(): - composite_key = f"{k}_{fk}" + + for (key, value) in topLevelItem.items(): + value_is_dict = isinstance(value, dict) + value_is_list = isinstance(value, list) + + if value_is_dict: + if key not in flat_keys: + flat_keys.append(key) + + for (subKey, subValue) in value.items(): + composite_key = f"{key}_{subKey}" if composite_key not in headers: headers.append(composite_key) - elif k in extract_keys_names: - ext = extract_keys[k] - extract_child(merge_headers, item, k, ext, f"{parent}_") - + elif value_is_list: + existing = extract_keys.get(key, {}) + extract_keys[key] = extract_child(merge_headers, value, key, f"{top_level}_", existing, 1) else: - if k not in headers: - headers.append(k) + if key not in headers: + headers.append(key) parsed += 1 - dbConn.make_table(parent, headers) + open("schema.json", "w").write( + json.dumps({ + "flat_keys": flat_keys, + "extract_keys": extract_keys + }, indent=2) + ) + dbConn.make_table(top_level, headers) for (mhKey, mhVal) in merge_headers.items(): dbConn.make_table(mhKey, mhVal) + extract_keys_names = extract_keys.keys() + # second pass, make flat json from original-json, create csv ( we will use sqlite, as it is faster to write) for inp_file in input_files: - for item in ijson.items(open(input_path / inp_file), "item"): - keys = item.keys() + for topLevelItem in ijson.items(open(input_path / inp_file), "item"): + keys = topLevelItem.keys() flat_json = {} - for k in keys: - if k in flat_keys: - for (fk, fv) in item[k].items(): - composite_key = f"{k}_{fk}" - flat_json[composite_key] = fv - elif k in extract_keys_names: - ext = extract_keys[k] - extract_child_value(merge_headers, item, k, ext, f"{parent}_", item[args.join_column]) + for key in keys: + if key in flat_keys: + for (subKey, subValue) in topLevelItem[key].items(): + composite_key = f"{key}_{subKey}" + flat_json[composite_key] = subValue + elif key in extract_keys_names: + ext = extract_keys[key] + extract_child_value(merge_headers, topLevelItem, key, ext, f"{top_level}_", + topLevelItem[args.join_column]) else: - flat_json[k] = item[k] + flat_json[key] = topLevelItem[key] - dbConn.write_to_database(parent, flat_json) + dbConn.write_to_database(top_level, flat_json) if not args.single: dbConn.con.commit() dbConn.make_csv_from_tables(prefix=f"{pathlib.Path(args.output / inp_file).stem}-") dbConn.reinit_db() - dbConn.make_table(parent, headers) + dbConn.make_table(top_level, headers) for (mhKey, mhVal) in merge_headers.items(): dbConn.make_table(mhKey, mhVal) diff --git a/schema.json b/schema.json deleted file mode 100644 index 8987584..0000000 --- a/schema.json +++ /dev/null @@ -1,55 +0,0 @@ -{ - "flat_keys": [ - "cost_center", - "location", - "customer", - "event" - ], - "extract_keys": { - "customer.addresses": { - "flat_keys": [], - "extract_keys": {} - }, - "customer.phone_numbers": { - "flat_keys": [], - "extract_keys": {} - }, - "price_modifiers": { - "flat_keys": [], - "extract_keys": {} - }, - "tender_items": { - "flat_keys": [], - "extract_keys": {} - }, - "fee_items": { - "flat_keys": [], - "extract_keys": {} - }, - "tax_items": { - "flat_keys": [], - "extract_keys": {} - }, - "loyalty": { - "flat_keys": [], - "extract_keys": {} - }, - "sale_items": { - "flat_keys": [], - "extract_keys": { - "categories": { - "key": "categories" - }, - "modifiers": { - "key": "modifiers", - "flat_keys": [], - "extract_keys": { - "categories": { - "key": "categories" - } - } - } - } - } - } -} \ No newline at end of file