Compare commits
6 Commits
b7668d19b9
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
882399f0b3 | ||
|
|
4c8de7dcf5 | ||
|
|
d171652b2a | ||
|
|
4df0f6239b | ||
|
|
5ceecc6bf4 | ||
|
|
72c13c353f |
84
main.py
84
main.py
@@ -1,4 +1,5 @@
|
|||||||
import json
|
import json
|
||||||
|
import os
|
||||||
import pathlib
|
import pathlib
|
||||||
import time
|
import time
|
||||||
import shutil
|
import shutil
|
||||||
@@ -11,10 +12,11 @@ import argparse
|
|||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
parser.add_argument("--input", help="folder containing input json(s)", required=True, type=pathlib.Path)
|
parser.add_argument("--input", help="folder containing input json(s)", required=True, type=pathlib.Path)
|
||||||
parser.add_argument("--output", help="folder to place csv", required=True, type=pathlib.Path)
|
parser.add_argument("--output", help="folder to place csv", required=True, type=pathlib.Path)
|
||||||
parser.add_argument("--delimiter", help="delimiter for CSV (default is '|'", default="|")
|
parser.add_argument("--delimiter", help="delimiter for CSV (default is '|')", default="|")
|
||||||
parser.add_argument("--single", action="store_true", help="merge all json files to single output csv")
|
parser.add_argument("--single", action="store_true", help="merge all json files to single output csv")
|
||||||
parser.add_argument("--debug", action="store_true", help="print debug logs")
|
parser.add_argument("--verbose", '-v', action="count", help="set verbose level", default=0)
|
||||||
parser.add_argument("--zip", action="store_true", help="make a zipfile of all outputs")
|
parser.add_argument("--zip", action="store_true", help="make a zipfile of all outputs")
|
||||||
|
parser.add_argument("--clean", action="store_true", help="clear output directory")
|
||||||
parser.add_argument("--metadata", type=int, help="how many records to parse for building metadata", default=1000)
|
parser.add_argument("--metadata", type=int, help="how many records to parse for building metadata", default=1000)
|
||||||
parser.add_argument("--join-column", help="join column from top-level to merge nested json", required=True)
|
parser.add_argument("--join-column", help="join column from top-level to merge nested json", required=True)
|
||||||
parser.add_argument("--name", help="base name, to be used in creating all data", required=True)
|
parser.add_argument("--name", help="base name, to be used in creating all data", required=True)
|
||||||
@@ -30,7 +32,7 @@ def make_archive(source: pathlib.Path, destination: pathlib.Path) -> None:
|
|||||||
shutil.make_archive(str(base_name), fmt, root_dir, base_dir)
|
shutil.make_archive(str(base_name), fmt, root_dir, base_dir)
|
||||||
|
|
||||||
|
|
||||||
class DBConn:
|
class JsonToCsv:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.cur = None
|
self.cur = None
|
||||||
self.con = None
|
self.con = None
|
||||||
@@ -41,8 +43,11 @@ class DBConn:
|
|||||||
|
|
||||||
def init_db(self):
|
def init_db(self):
|
||||||
self.counter += 1
|
self.counter += 1
|
||||||
self.ts = time.strftime('%Y%m%d_%H%M%S', time.localtime())
|
self.ts = time.strftime('%Y%m%d-%H%M%S', time.localtime())
|
||||||
self.con = sqlite3.connect(args.output / f"data-{args.name}-{self.ts}-{self.counter}.db")
|
db_name = args.output / f"data-{args.name}-{self.ts}-{self.counter}.db"
|
||||||
|
if args.verbose > 0:
|
||||||
|
print(f"creating DB {db_name}")
|
||||||
|
self.con = sqlite3.connect(db_name)
|
||||||
self.cur = self.con.cursor()
|
self.cur = self.con.cursor()
|
||||||
self.make_tables()
|
self.make_tables()
|
||||||
|
|
||||||
@@ -50,7 +55,7 @@ class DBConn:
|
|||||||
syspk = "syspk integer primary key autoincrement"
|
syspk = "syspk integer primary key autoincrement"
|
||||||
other_cols = ', '.join([f"\"{f}\" TEXT" for f in cols])
|
other_cols = ', '.join([f"\"{f}\" TEXT" for f in cols])
|
||||||
create_tbl_sql = f"create table if not exists \"{tbl_name}\" ({syspk}, {other_cols})"
|
create_tbl_sql = f"create table if not exists \"{tbl_name}\" ({syspk}, {other_cols})"
|
||||||
if args.debug:
|
if args.verbose >= 1:
|
||||||
print(f"create sql = {create_tbl_sql}")
|
print(f"create sql = {create_tbl_sql}")
|
||||||
self.cur.execute(create_tbl_sql)
|
self.cur.execute(create_tbl_sql)
|
||||||
|
|
||||||
@@ -76,22 +81,25 @@ class DBConn:
|
|||||||
not_found.append(col)
|
not_found.append(col)
|
||||||
|
|
||||||
if len(not_found) > 0:
|
if len(not_found) > 0:
|
||||||
|
if args.verbose >= 1:
|
||||||
|
print(f"added new cols {', '.join(not_found)} to {tbl}")
|
||||||
|
|
||||||
|
new_cols = list(tbl_cols)
|
||||||
for new_col in not_found:
|
for new_col in not_found:
|
||||||
self.cur.execute(f"alter table \"{tbl}\" add column \"{new_col}\" text")
|
self.cur.execute(f"alter table \"{tbl}\" add column \"{new_col}\" text")
|
||||||
|
new_cols.append(new_col)
|
||||||
print(f"added new cols {', '.join(not_found)} to {tbl}")
|
self.table_mapping[tbl] = new_cols
|
||||||
self.table_mapping[tbl] = list(col_names)
|
|
||||||
|
|
||||||
sql = f"insert into \"{tbl}\" ({col_names_placeholder}) values({value_placeholders})"
|
sql = f"insert into \"{tbl}\" ({col_names_placeholder}) values({value_placeholders})"
|
||||||
if args.debug:
|
if args.verbose == 2:
|
||||||
print(f"insert sql = {sql}")
|
print(f"insert sql = {sql}")
|
||||||
self.cur.execute(sql, values)
|
self.cur.execute(sql, values)
|
||||||
|
|
||||||
def make_csv_from_tables(self, prefix=''):
|
def make_csv_from_tables(self, prefix=''):
|
||||||
dbConn.cur.execute("SELECT name FROM sqlite_master WHERE type='table'")
|
json_csv.cur.execute("SELECT name FROM sqlite_master WHERE type='table'")
|
||||||
tbls = []
|
tbls = []
|
||||||
|
|
||||||
for (tbl,) in dbConn.cur.fetchall():
|
for (tbl,) in json_csv.cur.fetchall():
|
||||||
if tbl.find(args.name) == 0:
|
if tbl.find(args.name) == 0:
|
||||||
tbls.append(tbl)
|
tbls.append(tbl)
|
||||||
|
|
||||||
@@ -133,23 +141,31 @@ class DBConn:
|
|||||||
self.table_mapping[f"{prev_step}{current_level}"] = attributes
|
self.table_mapping[f"{prev_step}{current_level}"] = attributes
|
||||||
return {"flat_attributes": flat_attributes, "children": children, "attributes": attributes}
|
return {"flat_attributes": flat_attributes, "children": children, "attributes": attributes}
|
||||||
|
|
||||||
def extract_child_value(self, item, k, ext, prev_step, join_col_value):
|
def extract_child_value(self, item, current_level, ext, prev_step, prev_step_seq, join_col_value):
|
||||||
child_value = {args.join_column: join_col_value}
|
k_ = f"{prev_step}{current_level}"
|
||||||
for child in item[k]:
|
|
||||||
|
for seqNo, child in enumerate(item[current_level]):
|
||||||
|
child_value = {
|
||||||
|
args.join_column: join_col_value,
|
||||||
|
f"{prev_step}seq": prev_step_seq,
|
||||||
|
f"{current_level}_seq": seqNo + 1
|
||||||
|
}
|
||||||
for (subKey, subValue) in child.items():
|
for (subKey, subValue) in child.items():
|
||||||
is_ext = subKey in ext.get("children", {}).keys()
|
is_ext = subKey in ext.get("children", {}).keys()
|
||||||
if is_ext:
|
if is_ext:
|
||||||
self.extract_child_value(child, subKey, ext["children"][subKey],
|
self.extract_child_value(child, subKey, ext["children"][subKey],
|
||||||
f"{prev_step}{k}_",
|
f"{prev_step}{current_level}_",
|
||||||
join_col_value)
|
seqNo + 1, join_col_value)
|
||||||
else:
|
else:
|
||||||
child_header = f"{k}_{subKey}"
|
child_header = f"{current_level}_{subKey}"
|
||||||
child_value[child_header] = subValue
|
child_value[child_header] = subValue
|
||||||
|
self.write_to_database(k_, child_value)
|
||||||
k_ = f"{prev_step}{k}"
|
|
||||||
dbConn.write_to_database(k_, child_value)
|
|
||||||
|
|
||||||
def make_tables(self):
|
def make_tables(self):
|
||||||
|
if args.verbose >= 1:
|
||||||
|
print("making tables...")
|
||||||
|
for (mhKey, mhVal) in self.table_mapping.items():
|
||||||
|
print(f"table {mhKey}, cols {mhVal}")
|
||||||
for (mhKey, mhVal) in self.table_mapping.items():
|
for (mhKey, mhVal) in self.table_mapping.items():
|
||||||
self.make_table(mhKey, mhVal)
|
self.make_table(mhKey, mhVal)
|
||||||
|
|
||||||
@@ -190,7 +206,7 @@ class DBConn:
|
|||||||
child_attribute = f"{key}_{subKey}"
|
child_attribute = f"{key}_{subKey}"
|
||||||
if child_attribute not in attributes:
|
if child_attribute not in attributes:
|
||||||
attributes.append(child_attribute)
|
attributes.append(child_attribute)
|
||||||
|
|
||||||
elif value_is_list:
|
elif value_is_list:
|
||||||
existing = children.get(key, {})
|
existing = children.get(key, {})
|
||||||
children[key] = self.extract_child(value, key, f"{top_level}_", existing)
|
children[key] = self.extract_child(value, key, f"{top_level}_", existing)
|
||||||
@@ -211,15 +227,16 @@ class DBConn:
|
|||||||
|
|
||||||
children_names = children.keys()
|
children_names = children.keys()
|
||||||
|
|
||||||
|
top_level_idx = 0
|
||||||
# second pass, make flat json from original-json, put to sqlite, use pandas to make csv
|
# second pass, make flat json from original-json, put to sqlite, use pandas to make csv
|
||||||
for inp_file in input_files:
|
for inp_file in input_files:
|
||||||
|
|
||||||
if args.debug:
|
if args.verbose >= 1:
|
||||||
print(f"processing file {inp_file}")
|
print(f"processing file {inp_file}")
|
||||||
|
|
||||||
for topLevelItem in ijson.items(open(input_path / inp_file), "item"):
|
for topLevelItem in ijson.items(open(input_path / inp_file), "item"):
|
||||||
keys = topLevelItem.keys()
|
keys = topLevelItem.keys()
|
||||||
flat_json = {}
|
flat_json = {f"{top_level}_seq": top_level_idx + 1}
|
||||||
for key in keys:
|
for key in keys:
|
||||||
if key in flat_attributes:
|
if key in flat_attributes:
|
||||||
for (subKey, subValue) in topLevelItem[key].items():
|
for (subKey, subValue) in topLevelItem[key].items():
|
||||||
@@ -227,12 +244,16 @@ class DBConn:
|
|||||||
flat_json[child_attribute] = subValue
|
flat_json[child_attribute] = subValue
|
||||||
elif key in children_names:
|
elif key in children_names:
|
||||||
ext = children[key]
|
ext = children[key]
|
||||||
self.extract_child_value(topLevelItem, key, ext, f"{top_level}_",
|
self.extract_child_value(topLevelItem,
|
||||||
|
key,
|
||||||
|
ext, f"{top_level}_",
|
||||||
|
top_level_idx + 1,
|
||||||
topLevelItem[args.join_column])
|
topLevelItem[args.join_column])
|
||||||
else:
|
else:
|
||||||
flat_json[key] = topLevelItem[key]
|
flat_json[key] = topLevelItem[key]
|
||||||
|
|
||||||
self.write_to_database(top_level, flat_json)
|
self.write_to_database(top_level, flat_json)
|
||||||
|
top_level_idx += 1
|
||||||
|
|
||||||
if not args.single:
|
if not args.single:
|
||||||
self.con.commit()
|
self.con.commit()
|
||||||
@@ -247,7 +268,14 @@ class DBConn:
|
|||||||
make_archive(args.output, pathlib.Path(f"{top_level}.zip"))
|
make_archive(args.output, pathlib.Path(f"{top_level}.zip"))
|
||||||
|
|
||||||
|
|
||||||
dbConn = DBConn()
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
dbConn.parse_json()
|
if args.verbose >= 1:
|
||||||
|
print(f"args = {args}")
|
||||||
|
|
||||||
|
if args.clean:
|
||||||
|
for d in args.output.iterdir():
|
||||||
|
print(f"will delete {d}")
|
||||||
|
os.unlink(d)
|
||||||
|
|
||||||
|
json_csv = JsonToCsv()
|
||||||
|
json_csv.parse_json()
|
||||||
|
|||||||
Reference in New Issue
Block a user