import json import pathlib import time import ijson import sqlite3 import pandas as pd import argparse parser = argparse.ArgumentParser() parser.add_argument("--input", help="folder containing input json(s)", required=True, type=pathlib.Path) parser.add_argument("--output", help="folder to place csv", required=True, type=pathlib.Path) parser.add_argument("--delimiter", help="delimiter for CSV (default is '|'", default="|") parser.add_argument("--single", action="store_true", help="merge all json files to single output csv") parser.add_argument("--debug", action="store_true", help="print debug logs") parser.add_argument("--metadata", type=int, help="how many records to parse for building metadata", default=1000) parser.add_argument("--join-column", help="join column from top-level to merge nested json", required=True) parser.add_argument("--name", help="join column from top-level to merge nested json", required=True) args = parser.parse_args() class DBConn: def __init__(self): self.cur = None self.con = None self.counter = 0 self.ts = '' self.table_col_map = {} self.child_tables = {} self.init_db() def init_db(self): self.counter += 1 self.ts = time.strftime('%Y%m%d_%H%M%S', time.localtime()) self.con = sqlite3.connect(args.output / f"data-{args.name}-{self.ts}-{self.counter}.db") self.cur = self.con.cursor() self.make_tables() def make_table(self, tbl_name, cols): syspk = "syspk integer primary key autoincrement" other_cols = ', '.join([f"\"{f}\" TEXT" for f in cols]) create_tbl_sql = f"create table if not exists \"{tbl_name}\" ({syspk}, {other_cols})" if args.debug: print(f"create sql = {create_tbl_sql}") self.cur.execute(create_tbl_sql) self.table_col_map[tbl_name] = cols def write_to_database(self, tbl, cols): keys = cols.keys() # OrderedDict col_names = ', '.join( [f"\"{x}\"" for x in keys] ) value_placeholders = ', '.join( ["?" for _ in keys] ) # trim values of spaces values = tuple([str(cols[k]).strip() for k in keys]) # there might be a scenario that new cols might appear at a later stage, if so, # we shall accommodate by adding it to the database tbl_cols = self.table_col_map[tbl] not_found = [] for col in cols: if col not in tbl_cols: not_found.append(col) if len(not_found) > 0: for new_col in not_found: self.cur.execute(f"alter table \"{tbl}\" add column \"{new_col}\" text") print(f"added {not_found} cols to {tbl}") self.table_col_map[tbl] = cols sql = f"insert into \"{tbl}\" ({col_names}) values({value_placeholders})" self.cur.execute(sql, values) def make_csv_from_tables(self, prefix=''): dbConn.cur.execute("SELECT name FROM sqlite_master WHERE type='table'") tbls = [] for (tbl,) in dbConn.cur.fetchall(): if tbl.find(args.name) == 0: tbls.append(tbl) for tbl in tbls: clients = pd.read_sql(f"SELECT * FROM \"{tbl}\"", self.con) clients.to_csv(args.output / f"{prefix}{tbl}.csv", index=False, sep=args.delimiter) def extract_child(self, items_list, current_level, prev_step, existing, step): attributes = existing.get("attributes", [args.join_column]) flat_attributes = existing.get("flat_attributes", []) children = existing.get("children", {}) for child_idx in range(len(items_list)): child = items_list[child_idx] for (subKey, subValue) in child.items(): is_dict = isinstance(subValue, dict) is_list = isinstance(subValue, list) if is_dict: if subKey not in flat_attributes: flat_attributes.append(subKey) for (sub2Key, sub2Value) in subValue.items(): composite_key = f"{current_level}_{sub2Key}" if composite_key not in attributes: attributes.append(composite_key) if is_list: if subKey not in children.keys(): existing_next_step = children.get(subKey, {}) children[subKey] = self.extract_child(subValue, subKey, f"{prev_step}{current_level}_", existing_next_step, step + 1) else: child_header = f"{current_level}_{subKey}" if child_header not in attributes: attributes.append(child_header) self.child_tables[f"{prev_step}{current_level}"] = attributes return {"flat_attributes": flat_attributes, "children": children, "attributes": attributes} def extract_child_value(self, item, k, ext, prev_step, join_col_value): child_value = {args.join_column: join_col_value} for child in item[k]: for (subKey, subValue) in child.items(): is_ext = subKey in ext.get("children", {}).keys() if is_ext: self.extract_child_value(child, subKey, ext["children"][subKey], f"{prev_step}{k}_", join_col_value) else: child_header = f"{k}_{subKey}" child_value[child_header] = subValue k_ = f"{prev_step}{k}" dbConn.write_to_database(k_, child_value) def make_tables(self): for (mhKey, mhVal) in self.child_tables.items(): self.make_table(mhKey, mhVal) def parse_json(self): children = {} flat_attributes = [] attributes = [] input_path = args.input input_files = [x.name for x in input_path.iterdir() if not x.is_dir() and x.is_file() and x.name.endswith(".json")] print(f"found input file(s) {', '.join(input_files)} in path {args.input}") if len(input_files) == 0: print("could not find any input files, we shall stop") return parsed = 0 top_level = args.name # first pass, collect all headers print(f"parsing {input_files[0]} for metadata") for topLevelItem in ijson.items(open(input_path / input_files[0]), "item"): if parsed == args.metadata: print(f"parsed {parsed} records for metadata") break for (key, value) in topLevelItem.items(): value_is_dict = isinstance(value, dict) value_is_list = isinstance(value, list) if value_is_dict: if key not in flat_attributes: flat_attributes.append(key) for (subKey, subValue) in value.items(): child_attribute = f"{key}_{subKey}" if child_attribute not in attributes: attributes.append(child_attribute) elif value_is_list: existing = children.get(key, {}) children[key] = self.extract_child(value, key, f"{top_level}_", existing, 1) else: if key not in attributes: attributes.append(key) parsed += 1 open(f"schema-{args.name}.json", "w").write( json.dumps({ "attributes": attributes, "flat_attributes": flat_attributes, "children": children }, indent=2) ) self.child_tables[top_level] = attributes self.make_tables() children_names = children.keys() # second pass, make flat json from original-json, put to sqlite, use pandas to make csv for inp_file in input_files: if args.debug: print(f"processing file {inp_file}") for topLevelItem in ijson.items(open(input_path / inp_file), "item"): keys = topLevelItem.keys() flat_json = {} for key in keys: if key in flat_attributes: for (subKey, subValue) in topLevelItem[key].items(): child_attribute = f"{key}_{subKey}" flat_json[child_attribute] = subValue elif key in children_names: ext = children[key] self.extract_child_value(topLevelItem, key, ext, f"{top_level}_", topLevelItem[args.join_column]) else: flat_json[key] = topLevelItem[key] self.write_to_database(top_level, flat_json) if not args.single: self.con.commit() self.make_csv_from_tables(prefix=f"{pathlib.Path(args.output / inp_file).stem}-") self.init_db() if args.single: self.con.commit() self.make_csv_from_tables() dbConn = DBConn() if __name__ == '__main__': dbConn.parse_json()