import json import pathlib import time import ijson import sqlite3 import pandas as pd import argparse parser = argparse.ArgumentParser() parser.add_argument("--input", help="folder containing input json(s)", required=True, type=pathlib.Path) parser.add_argument("--output", help="folder to place csv", required=True, type=pathlib.Path) parser.add_argument("--schema", help="schema json", required=True, type=pathlib.Path) parser.add_argument("--single", action="store_true", help="merge all json files to single output csv") parser.add_argument("--metadata", type=int, help="how many records to parse for building metadata", default=1000) parser.add_argument("--join-column", help="join column from top-level to merge nested json", required=True) parser.add_argument("--name", help="join column from top-level to merge nested json", required=True) args = parser.parse_args() schema = json.load(open(args.schema)) print(f"join cols {args.join_column}") class DBConn: def __init__(self): self.cur = None self.con = None self.counter = 0 self.ts = '' self.reinit_db() def reinit_db(self): self.counter += 1 self.ts = time.strftime('%Y%m%d_%H%M%S', time.localtime()) self.con = sqlite3.connect(args.output / f"data-{args.name}-{self.ts}-{self.counter}.db") self.cur = self.con.cursor() def make_table(self, tbl_name, cols): syspk = "syspk integer primary key autoincrement" other_cols = ', '.join([f"{f} TEXT" for f in cols]) create_tbl_sql = f"create table if not exists {tbl_name} ({syspk}, {other_cols})" self.cur.execute(create_tbl_sql) def write_to_database(self, tbl, cols): keys = cols.keys() # OrderedDict col_names = ', '.join( [x for x in keys] ) value_placeholders = ', '.join( ["?" for _ in keys] ) # trim values of spaces values = tuple([str(cols[k]).strip() for k in keys]) sql = f"insert into {tbl} ({col_names}) values({value_placeholders})" self.cur.execute(sql, values) def make_csv_from_tables(self, prefix=''): dbConn.cur.execute("SELECT name FROM sqlite_master WHERE type='table'") tbls = [] for (tbl,) in dbConn.cur.fetchall(): if tbl.find("transactions") == 0: tbls.append(tbl) for tbl in tbls: clients = pd.read_sql(f"SELECT * FROM {tbl}", self.con) clients.to_csv(args.output / f"{prefix}{tbl}.csv", index=False) dbConn = DBConn() def extract_child(merge_headers, item, k, ext, prev_step): child_headers = merge_headers.get(k, [args.join_column]) for child in item[k]: for (subKey, subValue) in child.items(): is_ext = subKey in ext.get("extract_keys", {}).keys() if is_ext: extract_child(merge_headers, child, subKey, ext["extract_keys"][subKey], f"{prev_step}{k}_") else: child_header = f"{k}_{subKey}" if child_header not in child_headers: child_headers.append(child_header) merge_headers[f"{prev_step}{k}"] = child_headers def extract_child_value(merge_headers, item, k, ext, prev_step, join_col_value): child_value = {} child_value[args.join_column] = join_col_value for child in item[k]: for (subKey, subValue) in child.items(): is_ext = subKey in ext.get("extract_keys", {}).keys() if is_ext: extract_child_value(merge_headers, child, subKey, ext["extract_keys"][subKey], f"{prev_step}{k}_", join_col_value) else: child_header = f"{k}_{subKey}" child_value[child_header] = subValue k_ = f"{prev_step}{k}" dbConn.write_to_database(k_, child_value) def parse_json(): extract_keys = schema["extract_keys"] flat_keys = schema["flat_keys"] extract_keys_names = extract_keys.keys() headers = [] merge_headers = {} input_path = args.input input_files = [x.name for x in input_path.iterdir() if not x.is_dir() and x.is_file() and x.name.endswith(".json")] print(f"found input file(s) {', '.join(input_files)} in path {args.input}") if len(input_files) == 0: print("could not find any input files, we shall stop") return parsed = 0 parent = args.name # first pass, collect all headers for item in ijson.items(open(input_path / input_files[0]), "item"): if parsed > args.metadata: print(f"parsed {parsed} records for metadata") break keys = item.keys() for k in keys: if k in flat_keys: for (fk, fv) in item[k].items(): composite_key = f"{k}_{fk}" if composite_key not in headers: headers.append(composite_key) elif k in extract_keys_names: ext = extract_keys[k] extract_child(merge_headers, item, k, ext, f"{parent}_") else: if k not in headers: headers.append(k) parsed += 1 dbConn.make_table(parent, headers) for (mhKey, mhVal) in merge_headers.items(): dbConn.make_table(mhKey, mhVal) # second pass, make flat json from original-json, create csv ( we will use sqlite, as it is faster to write) for inp_file in input_files: for item in ijson.items(open(input_path / inp_file), "item"): keys = item.keys() flat_json = {} for k in keys: if k in flat_keys: for (fk, fv) in item[k].items(): composite_key = f"{k}_{fk}" flat_json[composite_key] = fv elif k in extract_keys_names: ext = extract_keys[k] extract_child_value(merge_headers, item, k, ext, f"{parent}_", item[args.join_column]) else: flat_json[k] = item[k] dbConn.write_to_database(parent, flat_json) if not args.single: dbConn.con.commit() dbConn.make_csv_from_tables(prefix=f"{pathlib.Path(args.output / inp_file).stem}-") dbConn.reinit_db() dbConn.make_table(parent, headers) for (mhKey, mhVal) in merge_headers.items(): dbConn.make_table(mhKey, mhVal) if args.single: dbConn.con.commit() dbConn.make_csv_from_tables() if __name__ == '__main__': parse_json()