import json import pathlib import time import ijson import sqlite3 import pandas as pd import argparse parser = argparse.ArgumentParser() parser.add_argument("--input", help="folder containing input json(s)", required=True, type=pathlib.Path) parser.add_argument("--output", help="folder to place csv", required=True, type=pathlib.Path) parser.add_argument("--single", action="store_true", help="merge all json files to single output csv") parser.add_argument("--metadata", type=int, help="how many records to parse for building metadata", default=1000) parser.add_argument("--join-column", help="join column from top-level to merge nested json", required=True) parser.add_argument("--name", help="join column from top-level to merge nested json", required=True) args = parser.parse_args() class DBConn: def __init__(self): self.cur = None self.con = None self.counter = 0 self.ts = '' self.reinit_db() def reinit_db(self): self.counter += 1 self.ts = time.strftime('%Y%m%d_%H%M%S', time.localtime()) self.con = sqlite3.connect(args.output / f"data-{args.name}-{self.ts}-{self.counter}.db") self.cur = self.con.cursor() def make_table(self, tbl_name, cols): syspk = "syspk integer primary key autoincrement" other_cols = ', '.join([f"{f} TEXT" for f in cols]) create_tbl_sql = f"create table if not exists {tbl_name} ({syspk}, {other_cols})" self.cur.execute(create_tbl_sql) def write_to_database(self, tbl, cols): keys = cols.keys() # OrderedDict col_names = ', '.join( [x for x in keys] ) value_placeholders = ', '.join( ["?" for _ in keys] ) # trim values of spaces values = tuple([str(cols[k]).strip() for k in keys]) sql = f"insert into {tbl} ({col_names}) values({value_placeholders})" self.cur.execute(sql, values) def make_csv_from_tables(self, prefix=''): dbConn.cur.execute("SELECT name FROM sqlite_master WHERE type='table'") tbls = [] for (tbl,) in dbConn.cur.fetchall(): if tbl.find("transactions") == 0: tbls.append(tbl) for tbl in tbls: clients = pd.read_sql(f"SELECT * FROM {tbl}", self.con) clients.to_csv(args.output / f"{prefix}{tbl}.csv", index=False) dbConn = DBConn() def extract_child(merge_headers, items_list, top_level, prev_step, existing, step): headers = merge_headers.get(top_level, [args.join_column]) flat_keys = existing.get("flat_keys", []) extract_keys = existing.get("extract_keys", {}) for child_idx in range(len(items_list)): child = items_list[child_idx] for (subKey, subValue) in child.items(): is_dict = isinstance(subValue, dict) is_list = isinstance(subValue, list) if is_dict: if subKey not in flat_keys: flat_keys.append(subKey) for (sub2Key, sub2Value) in subValue.items(): composite_key = f"{top_level}_{sub2Key}" if composite_key not in headers: headers.append(composite_key) if is_list: if subKey not in extract_keys.keys(): existing_next_step = extract_keys.get(subKey, {}) extract_keys[subKey] = extract_child(merge_headers, subValue, subKey, f"{prev_step}{top_level}_", existing_next_step, step + 1) else: child_header = f"{top_level}_{subKey}" if child_header not in headers: headers.append(child_header) merge_headers[f"{prev_step}{top_level}"] = headers return {"flat_keys": flat_keys, "extract_keys": extract_keys} def extract_child_value(merge_headers, item, k, ext, prev_step, join_col_value): child_value = {args.join_column: join_col_value} for child in item[k]: for (subKey, subValue) in child.items(): is_ext = subKey in ext.get("extract_keys", {}).keys() if is_ext: extract_child_value(merge_headers, child, subKey, ext["extract_keys"][subKey], f"{prev_step}{k}_", join_col_value) else: child_header = f"{k}_{subKey}" child_value[child_header] = subValue k_ = f"{prev_step}{k}" dbConn.write_to_database(k_, child_value) def parse_json(): extract_keys = {} flat_keys = [] headers = [] merge_headers = {} input_path = args.input input_files = [x.name for x in input_path.iterdir() if not x.is_dir() and x.is_file() and x.name.endswith(".json")] print(f"found input file(s) {', '.join(input_files)} in path {args.input}") if len(input_files) == 0: print("could not find any input files, we shall stop") return parsed = 0 top_level = args.name # first pass, collect all headers for topLevelItem in ijson.items(open(input_path / input_files[0]), "item"): if parsed > args.metadata: print(f"parsed {parsed} records for metadata") break for (key, value) in topLevelItem.items(): value_is_dict = isinstance(value, dict) value_is_list = isinstance(value, list) if value_is_dict: if key not in flat_keys: flat_keys.append(key) for (subKey, subValue) in value.items(): composite_key = f"{key}_{subKey}" if composite_key not in headers: headers.append(composite_key) elif value_is_list: existing = extract_keys.get(key, {}) extract_keys[key] = extract_child(merge_headers, value, key, f"{top_level}_", existing, 1) else: if key not in headers: headers.append(key) parsed += 1 open("schema.json", "w").write( json.dumps({ "flat_keys": flat_keys, "extract_keys": extract_keys }, indent=2) ) dbConn.make_table(top_level, headers) for (mhKey, mhVal) in merge_headers.items(): dbConn.make_table(mhKey, mhVal) extract_keys_names = extract_keys.keys() # second pass, make flat json from original-json, create csv ( we will use sqlite, as it is faster to write) for inp_file in input_files: for topLevelItem in ijson.items(open(input_path / inp_file), "item"): keys = topLevelItem.keys() flat_json = {} for key in keys: if key in flat_keys: for (subKey, subValue) in topLevelItem[key].items(): composite_key = f"{key}_{subKey}" flat_json[composite_key] = subValue elif key in extract_keys_names: ext = extract_keys[key] extract_child_value(merge_headers, topLevelItem, key, ext, f"{top_level}_", topLevelItem[args.join_column]) else: flat_json[key] = topLevelItem[key] dbConn.write_to_database(top_level, flat_json) if not args.single: dbConn.con.commit() dbConn.make_csv_from_tables(prefix=f"{pathlib.Path(args.output / inp_file).stem}-") dbConn.reinit_db() dbConn.make_table(top_level, headers) for (mhKey, mhVal) in merge_headers.items(): dbConn.make_table(mhKey, mhVal) if args.single: dbConn.con.commit() dbConn.make_csv_from_tables() if __name__ == '__main__': parse_json()