json_to_csv/main.py
gowthaman.b 7a5b5bc9a1 update
2023-06-16 14:37:36 +05:30

188 lines
6.4 KiB
Python

import json
import pathlib
import time
import ijson
import sqlite3
import pandas as pd
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--input", help="folder containing input json(s)", required=True, type=pathlib.Path)
parser.add_argument("--output", help="folder to place csv", required=True, type=pathlib.Path)
parser.add_argument("--schema", help="schema json", required=True, type=pathlib.Path)
parser.add_argument("--single", action="store_true", help="merge all json files to single output csv")
parser.add_argument("--metadata", type=int, help="how many records to parse for building metadata", default=1000)
parser.add_argument("--join-column", help="join column from top-level to merge nested json", required=True)
parser.add_argument("--name", help="join column from top-level to merge nested json", required=True)
args = parser.parse_args()
schema = json.load(open(args.schema))
class DBConn:
def __init__(self):
self.cur = None
self.con = None
self.counter = 0
self.ts = ''
self.reinit_db()
def reinit_db(self):
self.counter += 1
self.ts = time.strftime('%Y%m%d_%H%M%S', time.localtime())
self.con = sqlite3.connect(args.output / f"data-{args.name}-{self.ts}-{self.counter}.db")
self.cur = self.con.cursor()
def make_table(self, tbl_name, cols):
syspk = "syspk integer primary key autoincrement"
other_cols = ', '.join([f"{f} TEXT" for f in cols])
create_tbl_sql = f"create table if not exists {tbl_name} ({syspk}, {other_cols})"
self.cur.execute(create_tbl_sql)
def write_to_database(self, tbl, cols):
keys = cols.keys() # OrderedDict
col_names = ', '.join(
[x for x in keys]
)
value_placeholders = ', '.join(
["?" for _ in keys]
)
# trim values of spaces
values = tuple([str(cols[k]).strip() for k in keys])
sql = f"insert into {tbl} ({col_names}) values({value_placeholders})"
self.cur.execute(sql, values)
def make_csv_from_tables(self, prefix=''):
dbConn.cur.execute("SELECT name FROM sqlite_master WHERE type='table'")
tbls = []
for (tbl,) in dbConn.cur.fetchall():
if tbl.find("transactions") == 0:
tbls.append(tbl)
for tbl in tbls:
clients = pd.read_sql(f"SELECT * FROM {tbl}", self.con)
clients.to_csv(args.output / f"{prefix}{tbl}.csv", index=False)
dbConn = DBConn()
def extract_child(merge_headers, item, k, ext, prev_step):
child_headers = merge_headers.get(k, [args.join_column])
for child in item[k]:
for (subKey, subValue) in child.items():
is_ext = subKey in ext.get("extract_keys", {}).keys()
if is_ext:
extract_child(merge_headers, child, subKey, ext["extract_keys"][subKey], f"{prev_step}{k}_")
else:
child_header = f"{k}_{subKey}"
if child_header not in child_headers:
child_headers.append(child_header)
merge_headers[f"{prev_step}{k}"] = child_headers
def extract_child_value(merge_headers, item, k, ext, prev_step, join_col_value):
child_value = {}
child_value[args.join_column] = join_col_value
for child in item[k]:
for (subKey, subValue) in child.items():
is_ext = subKey in ext.get("extract_keys", {}).keys()
if is_ext:
extract_child_value(merge_headers, child, subKey, ext["extract_keys"][subKey], f"{prev_step}{k}_",
join_col_value)
else:
child_header = f"{k}_{subKey}"
child_value[child_header] = subValue
k_ = f"{prev_step}{k}"
dbConn.write_to_database(k_, child_value)
def parse_json():
extract_keys = schema["extract_keys"]
flat_keys = schema["flat_keys"]
extract_keys_names = extract_keys.keys()
headers = []
merge_headers = {}
input_path = args.input
input_files = [x.name for x in input_path.iterdir() if not x.is_dir() and x.is_file() and x.name.endswith(".json")]
print(f"found input file(s) {', '.join(input_files)} in path {args.input}")
if len(input_files) == 0:
print("could not find any input files, we shall stop")
return
parsed = 0
parent = args.name
# first pass, collect all headers
for item in ijson.items(open(input_path / input_files[0]), "item"):
if parsed > args.metadata:
print(f"parsed {parsed} records for metadata")
break
keys = item.keys()
for k in keys:
if k in flat_keys:
for (fk, fv) in item[k].items():
composite_key = f"{k}_{fk}"
if composite_key not in headers:
headers.append(composite_key)
elif k in extract_keys_names:
ext = extract_keys[k]
extract_child(merge_headers, item, k, ext, f"{parent}_")
else:
if k not in headers:
headers.append(k)
parsed += 1
dbConn.make_table(parent, headers)
for (mhKey, mhVal) in merge_headers.items():
dbConn.make_table(mhKey, mhVal)
# second pass, make flat json from original-json, create csv ( we will use sqlite, as it is faster to write)
for inp_file in input_files:
for item in ijson.items(open(input_path / inp_file), "item"):
keys = item.keys()
flat_json = {}
for k in keys:
if k in flat_keys:
for (fk, fv) in item[k].items():
composite_key = f"{k}_{fk}"
flat_json[composite_key] = fv
elif k in extract_keys_names:
ext = extract_keys[k]
extract_child_value(merge_headers, item, k, ext, f"{parent}_", item[args.join_column])
else:
flat_json[k] = item[k]
dbConn.write_to_database(parent, flat_json)
if not args.single:
dbConn.con.commit()
dbConn.make_csv_from_tables(prefix=f"{pathlib.Path(args.output / inp_file).stem}-")
dbConn.reinit_db()
dbConn.make_table(parent, headers)
for (mhKey, mhVal) in merge_headers.items():
dbConn.make_table(mhKey, mhVal)
if args.single:
dbConn.con.commit()
dbConn.make_csv_from_tables()
if __name__ == '__main__':
parse_json()