import pathlib import ijson import sqlite3 import pandas as pd import argparse parser = argparse.ArgumentParser() parser.add_argument("-i", "--input", help="folder containing input json(s)", required=True, type=pathlib.Path) parser.add_argument("-o", "--output", help="folder to place csv", required=True, type=pathlib.Path) parser.add_argument("-s", "--single", action="store_true", help="merge all json files to single output csv") parser.add_argument("-j", "--join-column", help="join column from top-level to merge nested json", required=True) parser.add_argument("-m", "--metadata", type=int, help="how many records to parse for building metadata", default=100) parser.parse_args() flat_keys = ["cost_center", "location", "customer"] extract_keys = { "price_modifiers": { "flat_keys": [], "extract_keys": {} }, "sale_items": { "flat_keys": [], "extract_keys": { "categories": { "key": "categories", }, "modifiers": { "key": "modifiers", "flat_keys": [], "extract_keys": { "categories": { "key": "categories", } } } } } } parent = "transactions" con = sqlite3.connect(f"{parent}.db") cur = con.cursor() def extract_child(merge_headers, item, k, ext, prev_step): child_headers = merge_headers.get(k, []) for child in item[k]: for (subKey, subValue) in child.items(): is_ext = subKey in ext.get("extract_keys", {}).keys() if is_ext: extract_child(merge_headers, child, subKey, ext["extract_keys"][subKey], f"{prev_step}{k}_") else: child_header = f"{k}_{subKey}" if child_header not in child_headers: child_headers.append(child_header) merge_headers[f"{prev_step}{k}"] = child_headers global_counter = 1 def extract_child_value(merge_headers, item, k, ext, prev_step): child_value = {} for child in item[k]: for (subKey, subValue) in child.items(): is_ext = subKey in ext.get("extract_keys", {}).keys() if is_ext: extract_child_value(merge_headers, child, subKey, ext["extract_keys"][subKey], f"{prev_step}{k}_") else: child_header = f"{k}_{subKey}" child_value[child_header] = subValue k_ = f"{prev_step}{k}" write_to_database(k_, child_value) def make_table(tbl_name, cols): syspk = "syspk integer primary key autoincrement" other_cols = ', '.join([f"{f} TEXT" for f in cols]) create_tbl_sql = f"create table if not exists {tbl_name} ({syspk}, {other_cols})" # print(f"{tbl_name} = ", cols) # print(f"{tbl_name} = ", create_tbl_sql) cur.execute(create_tbl_sql) def write_to_database(tbl, cols): keys = cols.keys() col_names = ', '.join( [x for x in keys] ) value_placeholders = ', '.join( ["?" for x in keys] ) values = tuple([str(cols[k]).strip() for k in keys]) sql = f"insert into {tbl} ({col_names}) values({value_placeholders})" # print(f"execute {sql} with values {values}") cur.execute(sql, values) def make_csv_from_tables(): cur.execute("SELECT name FROM sqlite_master WHERE type='table'") tbls = [] for (tbl,) in cur.fetchall(): # print("tbl = ", tbl) if tbl.find("transactions") == 0: tbls.append(tbl) for tbl in tbls: clients = pd.read_sql(f"SELECT * FROM {tbl}", con) clients.to_csv(f"{tbl}.csv", index=False) def parse_json(): extract_keys_names = extract_keys.keys() headers = [] merge_headers = {} # first pass, collect all headers for item in ijson.items(open("transactions.json"), "item"): keys = item.keys() for k in keys: if k in flat_keys: for (fk, fv) in item[k].items(): composite_key = f"{k}_{fk}" if composite_key not in headers: headers.append(composite_key) elif k in extract_keys_names: ext = extract_keys[k] extract_child(merge_headers, item, k, ext, f"{parent}_") else: if k not in headers: headers.append(k) make_table(parent, headers) for (mhKey, mhVal) in merge_headers.items(): make_table(mhKey, mhVal) # second pass, make flat json from original-json, create csv ( we will use sqlite, as it is faster to write) for item in ijson.items(open("transactions.json"), "item"): keys = item.keys() flat_json = {} for k in keys: if k in flat_keys: for (fk, fv) in item[k].items(): composite_key = f"{k}_{fk}" flat_json[composite_key] = fv elif k in extract_keys_names: ext = extract_keys[k] extract_child_value(merge_headers, item, k, ext, f"{parent}_") else: flat_json[k] = item[k] write_to_database(parent, flat_json) con.commit() make_csv_from_tables() if __name__ == '__main__': parse_json()