json_to_csv/main.py
2023-06-20 09:01:16 +05:30

254 lines
9.9 KiB
Python

import json
import pathlib
import time
import shutil
import ijson
import sqlite3
import pandas as pd
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--input", help="folder containing input json(s)", required=True, type=pathlib.Path)
parser.add_argument("--output", help="folder to place csv", required=True, type=pathlib.Path)
parser.add_argument("--delimiter", help="delimiter for CSV (default is '|'", default="|")
parser.add_argument("--single", action="store_true", help="merge all json files to single output csv")
parser.add_argument("--debug", action="store_true", help="print debug logs")
parser.add_argument("--zip", action="store_true", help="make a zipfile of all outputs")
parser.add_argument("--metadata", type=int, help="how many records to parse for building metadata", default=1000)
parser.add_argument("--join-column", help="join column from top-level to merge nested json", required=True)
parser.add_argument("--name", help="base name, to be used in creating all data", required=True)
args = parser.parse_args()
def make_archive(source: pathlib.Path, destination: pathlib.Path) -> None:
base_name = destination.parent / destination.stem
fmt = destination.suffix.replace(".", "")
root_dir = source.parent
base_dir = source.name
shutil.make_archive(str(base_name), fmt, root_dir, base_dir)
class DBConn:
def __init__(self):
self.cur = None
self.con = None
self.counter = 0
self.ts = ''
self.table_mapping = {}
self.init_db()
def init_db(self):
self.counter += 1
self.ts = time.strftime('%Y%m%d_%H%M%S', time.localtime())
self.con = sqlite3.connect(args.output / f"data-{args.name}-{self.ts}-{self.counter}.db")
self.cur = self.con.cursor()
self.make_tables()
def make_table(self, tbl_name, cols):
syspk = "syspk integer primary key autoincrement"
other_cols = ', '.join([f"\"{f}\" TEXT" for f in cols])
create_tbl_sql = f"create table if not exists \"{tbl_name}\" ({syspk}, {other_cols})"
if args.debug:
print(f"create sql = {create_tbl_sql}")
self.cur.execute(create_tbl_sql)
def write_to_database(self, tbl, column_data):
col_names = list(column_data.keys()) # OrderedDict
col_names_placeholder = ', '.join(
[f"\"{x}\"" for x in col_names]
)
value_placeholders = ', '.join(
["?" for _ in col_names]
)
# trim values of spaces
values = tuple([str(column_data[k]).strip() for k in col_names])
# there might be a scenario that new cols might appear at a later stage, if so,
# we shall accommodate by adding it to the database
tbl_cols = self.table_mapping[tbl]
if tbl_cols != col_names:
not_found = []
for col in col_names:
if col not in tbl_cols:
not_found.append(col)
if len(not_found) > 0:
for new_col in not_found:
self.cur.execute(f"alter table \"{tbl}\" add column \"{new_col}\" text")
print(f"added new cols {', '.join(not_found)} to {tbl}")
self.table_mapping[tbl] = list(col_names)
sql = f"insert into \"{tbl}\" ({col_names_placeholder}) values({value_placeholders})"
if args.debug:
print(f"insert sql = {sql}")
self.cur.execute(sql, values)
def make_csv_from_tables(self, prefix=''):
dbConn.cur.execute("SELECT name FROM sqlite_master WHERE type='table'")
tbls = []
for (tbl,) in dbConn.cur.fetchall():
if tbl.find(args.name) == 0:
tbls.append(tbl)
for tbl in tbls:
clients = pd.read_sql(f"SELECT * FROM \"{tbl}\"", self.con)
clients.to_csv(args.output / f"{prefix}{tbl}.csv", index=False, sep=args.delimiter)
def extract_child(self, items_list, current_level, prev_step, existing):
attributes = existing.get("attributes", [args.join_column])
flat_attributes = existing.get("flat_attributes", [])
children = existing.get("children", {})
for child_idx in range(len(items_list)):
child = items_list[child_idx]
for (subKey, subValue) in child.items():
is_dict = isinstance(subValue, dict)
is_list = isinstance(subValue, list)
if is_dict:
if subKey not in flat_attributes:
flat_attributes.append(subKey)
for (sub2Key, sub2Value) in subValue.items():
composite_key = f"{current_level}_{sub2Key}"
if composite_key not in attributes:
attributes.append(composite_key)
if is_list:
if subKey not in children.keys():
existing_next_step = children.get(subKey, {})
children[subKey] = self.extract_child(subValue, subKey,
f"{prev_step}{current_level}_",
existing_next_step)
else:
child_header = f"{current_level}_{subKey}"
if child_header not in attributes:
attributes.append(child_header)
self.table_mapping[f"{prev_step}{current_level}"] = attributes
return {"flat_attributes": flat_attributes, "children": children, "attributes": attributes}
def extract_child_value(self, item, k, ext, prev_step, join_col_value):
child_value = {args.join_column: join_col_value}
for child in item[k]:
for (subKey, subValue) in child.items():
is_ext = subKey in ext.get("children", {}).keys()
if is_ext:
self.extract_child_value(child, subKey, ext["children"][subKey],
f"{prev_step}{k}_",
join_col_value)
else:
child_header = f"{k}_{subKey}"
child_value[child_header] = subValue
k_ = f"{prev_step}{k}"
dbConn.write_to_database(k_, child_value)
def make_tables(self):
for (mhKey, mhVal) in self.table_mapping.items():
self.make_table(mhKey, mhVal)
def parse_json(self):
children = {}
flat_attributes = []
attributes = []
input_path = args.input
input_files = [x.name for x in input_path.iterdir() if
not x.is_dir() and x.is_file() and x.name.endswith(".json")]
print(f"found input file(s) {', '.join(input_files)} in path {args.input}")
if len(input_files) == 0:
print("could not find any input files, we shall stop")
return
parsed = 0
top_level = args.name
# first pass, collect all headers
print(f"parsing {input_files[0]} for metadata")
for topLevelItem in ijson.items(open(input_path / input_files[0]), "item"):
if parsed == args.metadata:
print(f"parsed {parsed} records for metadata")
break
for (key, value) in topLevelItem.items():
value_is_dict = isinstance(value, dict)
value_is_list = isinstance(value, list)
if value_is_dict:
if key not in flat_attributes:
flat_attributes.append(key)
for (subKey, subValue) in value.items():
child_attribute = f"{key}_{subKey}"
if child_attribute not in attributes:
attributes.append(child_attribute)
elif value_is_list:
existing = children.get(key, {})
children[key] = self.extract_child(value, key, f"{top_level}_", existing)
else:
if key not in attributes:
attributes.append(key)
parsed += 1
open(f"schema-{args.name}.json", "w").write(
json.dumps({
"attributes": attributes,
"flat_attributes": flat_attributes,
"children": children
}, indent=2)
)
self.table_mapping[top_level] = attributes
self.make_tables()
children_names = children.keys()
# second pass, make flat json from original-json, put to sqlite, use pandas to make csv
for inp_file in input_files:
if args.debug:
print(f"processing file {inp_file}")
for topLevelItem in ijson.items(open(input_path / inp_file), "item"):
keys = topLevelItem.keys()
flat_json = {}
for key in keys:
if key in flat_attributes:
for (subKey, subValue) in topLevelItem[key].items():
child_attribute = f"{key}_{subKey}"
flat_json[child_attribute] = subValue
elif key in children_names:
ext = children[key]
self.extract_child_value(topLevelItem, key, ext, f"{top_level}_",
topLevelItem[args.join_column])
else:
flat_json[key] = topLevelItem[key]
self.write_to_database(top_level, flat_json)
if not args.single:
self.con.commit()
self.make_csv_from_tables(prefix=f"{pathlib.Path(args.output / inp_file).stem}-")
self.init_db()
if args.single:
self.con.commit()
self.make_csv_from_tables()
if args.zip:
make_archive(args.output, pathlib.Path(f"{top_level}.zip"))
dbConn = DBConn()
if __name__ == '__main__':
dbConn.parse_json()