Compare commits

..

No commits in common. "5ceecc6bf49056a1dc56ba4d0c956b29c1652cfd" and "b7668d19b9a2a1c32cb2f8d3b4cb126fc6ca8bdd" have entirely different histories.

63
main.py
View File

@ -1,5 +1,4 @@
import json import json
import os
import pathlib import pathlib
import time import time
import shutil import shutil
@ -14,9 +13,8 @@ parser.add_argument("--input", help="folder containing input json(s)", required=
parser.add_argument("--output", help="folder to place csv", required=True, type=pathlib.Path) parser.add_argument("--output", help="folder to place csv", required=True, type=pathlib.Path)
parser.add_argument("--delimiter", help="delimiter for CSV (default is '|'", default="|") parser.add_argument("--delimiter", help="delimiter for CSV (default is '|'", default="|")
parser.add_argument("--single", action="store_true", help="merge all json files to single output csv") parser.add_argument("--single", action="store_true", help="merge all json files to single output csv")
parser.add_argument("--verbose", '-v', action="count", help="set verbose level", default=0) parser.add_argument("--debug", action="store_true", help="print debug logs")
parser.add_argument("--zip", action="store_true", help="make a zipfile of all outputs") parser.add_argument("--zip", action="store_true", help="make a zipfile of all outputs")
parser.add_argument("--clean", action="store_true", help="clear output directory")
parser.add_argument("--metadata", type=int, help="how many records to parse for building metadata", default=1000) parser.add_argument("--metadata", type=int, help="how many records to parse for building metadata", default=1000)
parser.add_argument("--join-column", help="join column from top-level to merge nested json", required=True) parser.add_argument("--join-column", help="join column from top-level to merge nested json", required=True)
parser.add_argument("--name", help="base name, to be used in creating all data", required=True) parser.add_argument("--name", help="base name, to be used in creating all data", required=True)
@ -52,7 +50,7 @@ class DBConn:
syspk = "syspk integer primary key autoincrement" syspk = "syspk integer primary key autoincrement"
other_cols = ', '.join([f"\"{f}\" TEXT" for f in cols]) other_cols = ', '.join([f"\"{f}\" TEXT" for f in cols])
create_tbl_sql = f"create table if not exists \"{tbl_name}\" ({syspk}, {other_cols})" create_tbl_sql = f"create table if not exists \"{tbl_name}\" ({syspk}, {other_cols})"
if args.verbose >= 1: if args.debug:
print(f"create sql = {create_tbl_sql}") print(f"create sql = {create_tbl_sql}")
self.cur.execute(create_tbl_sql) self.cur.execute(create_tbl_sql)
@ -78,18 +76,14 @@ class DBConn:
not_found.append(col) not_found.append(col)
if len(not_found) > 0: if len(not_found) > 0:
if args.verbose >= 1:
print(
f"added new cols {', '.join(not_found)} to {tbl}, already present {tbl_cols}, want {col_names}")
new_cols = list(tbl_cols)
for new_col in not_found: for new_col in not_found:
self.cur.execute(f"alter table \"{tbl}\" add column \"{new_col}\" text") self.cur.execute(f"alter table \"{tbl}\" add column \"{new_col}\" text")
new_cols.append(new_col)
self.table_mapping[tbl] = new_cols print(f"added new cols {', '.join(not_found)} to {tbl}")
self.table_mapping[tbl] = list(col_names)
sql = f"insert into \"{tbl}\" ({col_names_placeholder}) values({value_placeholders})" sql = f"insert into \"{tbl}\" ({col_names_placeholder}) values({value_placeholders})"
if args.verbose == 2: if args.debug:
print(f"insert sql = {sql}") print(f"insert sql = {sql}")
self.cur.execute(sql, values) self.cur.execute(sql, values)
@ -139,31 +133,23 @@ class DBConn:
self.table_mapping[f"{prev_step}{current_level}"] = attributes self.table_mapping[f"{prev_step}{current_level}"] = attributes
return {"flat_attributes": flat_attributes, "children": children, "attributes": attributes} return {"flat_attributes": flat_attributes, "children": children, "attributes": attributes}
def extract_child_value(self, item, current_level, ext, prev_step, prev_step_seq, join_col_value): def extract_child_value(self, item, k, ext, prev_step, join_col_value):
k_ = f"{prev_step}{current_level}" child_value = {args.join_column: join_col_value}
for child in item[k]:
for seqNo, child in enumerate(item[current_level]):
child_value = {
args.join_column: join_col_value,
f"{prev_step}seq": prev_step_seq,
f"{current_level}_seq": seqNo + 1
}
for (subKey, subValue) in child.items(): for (subKey, subValue) in child.items():
is_ext = subKey in ext.get("children", {}).keys() is_ext = subKey in ext.get("children", {}).keys()
if is_ext: if is_ext:
self.extract_child_value(child, subKey, ext["children"][subKey], self.extract_child_value(child, subKey, ext["children"][subKey],
f"{prev_step}{current_level}_", f"{prev_step}{k}_",
seqNo + 1, join_col_value) join_col_value)
else: else:
child_header = f"{current_level}_{subKey}" child_header = f"{k}_{subKey}"
child_value[child_header] = subValue child_value[child_header] = subValue
self.write_to_database(k_, child_value)
k_ = f"{prev_step}{k}"
dbConn.write_to_database(k_, child_value)
def make_tables(self): def make_tables(self):
if args.verbose >= 1:
print("making tables...")
for (mhKey, mhVal) in self.table_mapping.items():
print(f"table {mhKey}, cols {mhVal}")
for (mhKey, mhVal) in self.table_mapping.items(): for (mhKey, mhVal) in self.table_mapping.items():
self.make_table(mhKey, mhVal) self.make_table(mhKey, mhVal)
@ -225,16 +211,15 @@ class DBConn:
children_names = children.keys() children_names = children.keys()
top_level_idx = 0
# second pass, make flat json from original-json, put to sqlite, use pandas to make csv # second pass, make flat json from original-json, put to sqlite, use pandas to make csv
for inp_file in input_files: for inp_file in input_files:
if args.verbose >= 1: if args.debug:
print(f"processing file {inp_file}") print(f"processing file {inp_file}")
for topLevelItem in ijson.items(open(input_path / inp_file), "item"): for topLevelItem in ijson.items(open(input_path / inp_file), "item"):
keys = topLevelItem.keys() keys = topLevelItem.keys()
flat_json = {f"{top_level}_seq": top_level_idx + 1} flat_json = {}
for key in keys: for key in keys:
if key in flat_attributes: if key in flat_attributes:
for (subKey, subValue) in topLevelItem[key].items(): for (subKey, subValue) in topLevelItem[key].items():
@ -242,16 +227,12 @@ class DBConn:
flat_json[child_attribute] = subValue flat_json[child_attribute] = subValue
elif key in children_names: elif key in children_names:
ext = children[key] ext = children[key]
self.extract_child_value(topLevelItem, self.extract_child_value(topLevelItem, key, ext, f"{top_level}_",
key,
ext, f"{top_level}_",
top_level_idx + 1,
topLevelItem[args.join_column]) topLevelItem[args.join_column])
else: else:
flat_json[key] = topLevelItem[key] flat_json[key] = topLevelItem[key]
self.write_to_database(top_level, flat_json) self.write_to_database(top_level, flat_json)
top_level_idx += 1
if not args.single: if not args.single:
self.con.commit() self.con.commit()
@ -269,12 +250,4 @@ class DBConn:
dbConn = DBConn() dbConn = DBConn()
if __name__ == '__main__': if __name__ == '__main__':
if args.verbose >= 1:
print(f"args = {args}")
if args.clean:
for d in args.output.glob("*.csv"):
print(f"will delete {d}")
os.unlink(d)
dbConn.parse_json() dbConn.parse_json()