import json import os import pathlib import time import shutil import ijson import sqlite3 import pandas as pd import argparse parser = argparse.ArgumentParser() parser.add_argument("--input", help="folder containing input json(s)", required=True, type=pathlib.Path) parser.add_argument("--output", help="folder to place csv", required=True, type=pathlib.Path) parser.add_argument("--delimiter", help="delimiter for CSV (default is '|')", default="|") parser.add_argument("--single", action="store_true", help="merge all json files to single output csv") parser.add_argument("--verbose", '-v', action="count", help="set verbose level", default=0) parser.add_argument("--zip", action="store_true", help="make a zipfile of all outputs") parser.add_argument("--clean", action="store_true", help="clear output directory") parser.add_argument("--metadata", type=int, help="how many records to parse for building metadata", default=1000) parser.add_argument("--join-column", help="join column from top-level to merge nested json", required=True) parser.add_argument("--name", help="base name, to be used in creating all data", required=True) args = parser.parse_args() def make_archive(source: pathlib.Path, destination: pathlib.Path) -> None: base_name = destination.parent / destination.stem fmt = destination.suffix.replace(".", "") root_dir = source.parent base_dir = source.name shutil.make_archive(str(base_name), fmt, root_dir, base_dir) class JsonToCsv: def __init__(self): self.cur = None self.con = None self.counter = 0 self.ts = '' self.table_mapping = {} self.init_db() def init_db(self): self.counter += 1 self.ts = time.strftime('%Y%m%d-%H%M%S', time.localtime()) db_name = args.output / f"data-{args.name}-{self.ts}-{self.counter}.db" if args.verbose > 0: print(f"creating DB {db_name}") self.con = sqlite3.connect(db_name) self.cur = self.con.cursor() self.make_tables() def make_table(self, tbl_name, cols): syspk = "syspk integer primary key autoincrement" other_cols = ', '.join([f"\"{f}\" TEXT" for f in cols]) create_tbl_sql = f"create table if not exists \"{tbl_name}\" ({syspk}, {other_cols})" if args.verbose >= 1: print(f"create sql = {create_tbl_sql}") self.cur.execute(create_tbl_sql) def write_to_database(self, tbl, column_data): col_names = list(column_data.keys()) # OrderedDict col_names_placeholder = ', '.join( [f"\"{x}\"" for x in col_names] ) value_placeholders = ', '.join( ["?" for _ in col_names] ) # trim values of spaces values = tuple([str(column_data[k]).strip() for k in col_names]) # there might be a scenario that new cols might appear at a later stage, if so, # we shall accommodate by adding it to the database tbl_cols = self.table_mapping[tbl] if tbl_cols != col_names: not_found = [] for col in col_names: if col not in tbl_cols: not_found.append(col) if len(not_found) > 0: if args.verbose >= 1: print(f"added new cols {', '.join(not_found)} to {tbl}") new_cols = list(tbl_cols) for new_col in not_found: self.cur.execute(f"alter table \"{tbl}\" add column \"{new_col}\" text") new_cols.append(new_col) self.table_mapping[tbl] = new_cols sql = f"insert into \"{tbl}\" ({col_names_placeholder}) values({value_placeholders})" if args.verbose == 2: print(f"insert sql = {sql}") self.cur.execute(sql, values) def make_csv_from_tables(self, prefix=''): json_csv.cur.execute("SELECT name FROM sqlite_master WHERE type='table'") tbls = [] for (tbl,) in json_csv.cur.fetchall(): if tbl.find(args.name) == 0: tbls.append(tbl) for tbl in tbls: clients = pd.read_sql(f"SELECT * FROM \"{tbl}\"", self.con) clients.to_csv(args.output / f"{prefix}{tbl}.csv", index=False, sep=args.delimiter) def extract_child(self, items_list, current_level, prev_step, existing): attributes = existing.get("attributes", [args.join_column]) flat_attributes = existing.get("flat_attributes", []) children = existing.get("children", {}) for child_idx in range(len(items_list)): child = items_list[child_idx] for (subKey, subValue) in child.items(): is_dict = isinstance(subValue, dict) is_list = isinstance(subValue, list) if is_dict: if subKey not in flat_attributes: flat_attributes.append(subKey) for (sub2Key, sub2Value) in subValue.items(): composite_key = f"{current_level}_{sub2Key}" if composite_key not in attributes: attributes.append(composite_key) if is_list: if subKey not in children.keys(): existing_next_step = children.get(subKey, {}) children[subKey] = self.extract_child(subValue, subKey, f"{prev_step}{current_level}_", existing_next_step) else: child_header = f"{current_level}_{subKey}" if child_header not in attributes: attributes.append(child_header) self.table_mapping[f"{prev_step}{current_level}"] = attributes return {"flat_attributes": flat_attributes, "children": children, "attributes": attributes} def extract_child_value(self, item, current_level, ext, prev_step, prev_step_seq, join_col_value): k_ = f"{prev_step}{current_level}" for seqNo, child in enumerate(item[current_level]): child_value = { args.join_column: join_col_value, f"{prev_step}seq": prev_step_seq, f"{current_level}_seq": seqNo + 1 } for (subKey, subValue) in child.items(): is_ext = subKey in ext.get("children", {}).keys() if is_ext: self.extract_child_value(child, subKey, ext["children"][subKey], f"{prev_step}{current_level}_", seqNo + 1, join_col_value) else: child_header = f"{current_level}_{subKey}" child_value[child_header] = subValue self.write_to_database(k_, child_value) def make_tables(self): if args.verbose >= 1: print("making tables...") for (mhKey, mhVal) in self.table_mapping.items(): print(f"table {mhKey}, cols {mhVal}") for (mhKey, mhVal) in self.table_mapping.items(): self.make_table(mhKey, mhVal) def parse_json(self): children = {} flat_attributes = [] attributes = [] input_path = args.input input_files = [x.name for x in input_path.iterdir() if not x.is_dir() and x.is_file() and x.name.endswith(".json")] print(f"found input file(s) {', '.join(input_files)} in path {args.input}") if len(input_files) == 0: print("could not find any input files, we shall stop") return parsed = 0 top_level = args.name # first pass, collect all headers print(f"parsing {input_files[0]} for metadata") for topLevelItem in ijson.items(open(input_path / input_files[0]), "item"): if parsed == args.metadata: print(f"parsed {parsed} records for metadata") break for (key, value) in topLevelItem.items(): value_is_dict = isinstance(value, dict) value_is_list = isinstance(value, list) if value_is_dict: if key not in flat_attributes: flat_attributes.append(key) for (subKey, subValue) in value.items(): child_attribute = f"{key}_{subKey}" if child_attribute not in attributes: attributes.append(child_attribute) elif value_is_list: existing = children.get(key, {}) children[key] = self.extract_child(value, key, f"{top_level}_", existing) else: if key not in attributes: attributes.append(key) parsed += 1 open(f"schema-{args.name}.json", "w").write( json.dumps({ "attributes": attributes, "flat_attributes": flat_attributes, "children": children }, indent=2) ) self.table_mapping[top_level] = attributes self.make_tables() children_names = children.keys() top_level_idx = 0 # second pass, make flat json from original-json, put to sqlite, use pandas to make csv for inp_file in input_files: if args.verbose >= 1: print(f"processing file {inp_file}") for topLevelItem in ijson.items(open(input_path / inp_file), "item"): keys = topLevelItem.keys() flat_json = {f"{top_level}_seq": top_level_idx + 1} for key in keys: if key in flat_attributes: for (subKey, subValue) in topLevelItem[key].items(): child_attribute = f"{key}_{subKey}" flat_json[child_attribute] = subValue elif key in children_names: ext = children[key] self.extract_child_value(topLevelItem, key, ext, f"{top_level}_", top_level_idx + 1, topLevelItem[args.join_column]) else: flat_json[key] = topLevelItem[key] self.write_to_database(top_level, flat_json) top_level_idx += 1 if not args.single: self.con.commit() self.make_csv_from_tables(prefix=f"{pathlib.Path(args.output / inp_file).stem}-") self.init_db() if args.single: self.con.commit() self.make_csv_from_tables() if args.zip: make_archive(args.output, pathlib.Path(f"{top_level}.zip")) if __name__ == '__main__': if args.verbose >= 1: print(f"args = {args}") if args.clean: for d in args.output.iterdir(): print(f"will delete {d}") os.unlink(d) json_csv = JsonToCsv() json_csv.parse_json()