From 6082809715ea0b0a9a79e5188f1b0ef8333d1af8 Mon Sep 17 00:00:00 2001 From: "gowthaman.b" Date: Mon, 19 Jun 2023 17:31:40 +0530 Subject: [PATCH] move to a class based thingy, make schema name specific --- main.py | 295 ++++++++++++++++++++++++++++---------------------------- 1 file changed, 149 insertions(+), 146 deletions(-) diff --git a/main.py b/main.py index e8b300a..c21582d 100644 --- a/main.py +++ b/main.py @@ -26,14 +26,16 @@ class DBConn: self.con = None self.counter = 0 self.ts = '' - self.reinit_db() self.table_col_map = {} + self.child_tables = {} + self.init_db() - def reinit_db(self): + def init_db(self): self.counter += 1 self.ts = time.strftime('%Y%m%d_%H%M%S', time.localtime()) self.con = sqlite3.connect(args.output / f"data-{args.name}-{self.ts}-{self.counter}.db") self.cur = self.con.cursor() + self.make_tables() def make_table(self, tbl_name, cols): syspk = "syspk integer primary key autoincrement" @@ -86,151 +88,152 @@ class DBConn: clients = pd.read_sql(f"SELECT * FROM \"{tbl}\"", self.con) clients.to_csv(args.output / f"{prefix}{tbl}.csv", index=False, sep=args.delimiter) + def extract_child(self, items_list, current_level, prev_step, existing, step): + + attributes = existing.get("attributes", [args.join_column]) + flat_attributes = existing.get("flat_attributes", []) + children = existing.get("children", {}) + + for child_idx in range(len(items_list)): + child = items_list[child_idx] + for (subKey, subValue) in child.items(): + + is_dict = isinstance(subValue, dict) + is_list = isinstance(subValue, list) + + if is_dict: + if subKey not in flat_attributes: + flat_attributes.append(subKey) + for (sub2Key, sub2Value) in subValue.items(): + composite_key = f"{current_level}_{sub2Key}" + if composite_key not in attributes: + attributes.append(composite_key) + if is_list: + if subKey not in children.keys(): + existing_next_step = children.get(subKey, {}) + children[subKey] = self.extract_child(subValue, subKey, + f"{prev_step}{current_level}_", + existing_next_step, step + 1) + else: + child_header = f"{current_level}_{subKey}" + if child_header not in attributes: + attributes.append(child_header) + + self.child_tables[f"{prev_step}{current_level}"] = attributes + return {"flat_attributes": flat_attributes, "children": children, "attributes": attributes} + + def extract_child_value(self, item, k, ext, prev_step, join_col_value): + child_value = {args.join_column: join_col_value} + for child in item[k]: + for (subKey, subValue) in child.items(): + is_ext = subKey in ext.get("children", {}).keys() + if is_ext: + self.extract_child_value(child, subKey, ext["children"][subKey], + f"{prev_step}{k}_", + join_col_value) + else: + child_header = f"{k}_{subKey}" + child_value[child_header] = subValue + + k_ = f"{prev_step}{k}" + dbConn.write_to_database(k_, child_value) + + def make_tables(self): + for (mhKey, mhVal) in self.child_tables.items(): + self.make_table(mhKey, mhVal) + + def parse_json(self): + children = {} + flat_attributes = [] + attributes = [] + + input_path = args.input + + input_files = [x.name for x in input_path.iterdir() if + not x.is_dir() and x.is_file() and x.name.endswith(".json")] + print(f"found input file(s) {', '.join(input_files)} in path {args.input}") + + if len(input_files) == 0: + print("could not find any input files, we shall stop") + return + + parsed = 0 + top_level = args.name + + # first pass, collect all headers + print(f"parsing {input_files[0]} for metadata") + for topLevelItem in ijson.items(open(input_path / input_files[0]), "item"): + if parsed == args.metadata: + print(f"parsed {parsed} records for metadata") + break + + for (key, value) in topLevelItem.items(): + value_is_dict = isinstance(value, dict) + value_is_list = isinstance(value, list) + + if value_is_dict: + if key not in flat_attributes: + flat_attributes.append(key) + + for (subKey, subValue) in value.items(): + child_attribute = f"{key}_{subKey}" + if child_attribute not in attributes: + attributes.append(child_attribute) + elif value_is_list: + existing = children.get(key, {}) + children[key] = self.extract_child(value, key, f"{top_level}_", existing, 1) + else: + if key not in attributes: + attributes.append(key) + parsed += 1 + + open(f"schema-{args.name}.json", "w").write( + json.dumps({ + "attributes": attributes, + "flat_attributes": flat_attributes, + "children": children + }, indent=2) + ) + self.child_tables[top_level] = attributes + + self.make_tables() + + children_names = children.keys() + + # second pass, make flat json from original-json, put to sqlite, use pandas to make csv + for inp_file in input_files: + + if args.debug: + print(f"processing file {inp_file}") + + for topLevelItem in ijson.items(open(input_path / inp_file), "item"): + keys = topLevelItem.keys() + flat_json = {} + for key in keys: + if key in flat_attributes: + for (subKey, subValue) in topLevelItem[key].items(): + child_attribute = f"{key}_{subKey}" + flat_json[child_attribute] = subValue + elif key in children_names: + ext = children[key] + self.extract_child_value(topLevelItem, key, ext, f"{top_level}_", + topLevelItem[args.join_column]) + else: + flat_json[key] = topLevelItem[key] + + self.write_to_database(top_level, flat_json) + + if not args.single: + self.con.commit() + self.make_csv_from_tables(prefix=f"{pathlib.Path(args.output / inp_file).stem}-") + self.init_db() + + if args.single: + self.con.commit() + self.make_csv_from_tables() + dbConn = DBConn() - -def extract_child(merge_headers, items_list, top_level, prev_step, existing, step): - headers = merge_headers.get(top_level, [args.join_column]) - - flat_keys = existing.get("flat_keys", []) - extract_keys = existing.get("extract_keys", {}) - - for child_idx in range(len(items_list)): - child = items_list[child_idx] - for (subKey, subValue) in child.items(): - - is_dict = isinstance(subValue, dict) - is_list = isinstance(subValue, list) - - if is_dict: - if subKey not in flat_keys: - flat_keys.append(subKey) - for (sub2Key, sub2Value) in subValue.items(): - composite_key = f"{top_level}_{sub2Key}" - if composite_key not in headers: - headers.append(composite_key) - if is_list: - if subKey not in extract_keys.keys(): - existing_next_step = extract_keys.get(subKey, {}) - extract_keys[subKey] = extract_child(merge_headers, subValue, subKey, f"{prev_step}{top_level}_", - existing_next_step, step + 1) - else: - child_header = f"{top_level}_{subKey}" - if child_header not in headers: - headers.append(child_header) - - merge_headers[f"{prev_step}{top_level}"] = headers - return {"flat_keys": flat_keys, "extract_keys": extract_keys} - - -def extract_child_value(merge_headers, item, k, ext, prev_step, join_col_value): - child_value = {args.join_column: join_col_value} - for child in item[k]: - for (subKey, subValue) in child.items(): - is_ext = subKey in ext.get("extract_keys", {}).keys() - if is_ext: - extract_child_value(merge_headers, child, subKey, ext["extract_keys"][subKey], f"{prev_step}{k}_", - join_col_value) - else: - child_header = f"{k}_{subKey}" - child_value[child_header] = subValue - - k_ = f"{prev_step}{k}" - dbConn.write_to_database(k_, child_value) - - -def parse_json(): - extract_keys = {} - flat_keys = [] - - headers = [] - merge_headers = {} - - input_path = args.input - - input_files = [x.name for x in input_path.iterdir() if not x.is_dir() and x.is_file() and x.name.endswith(".json")] - print(f"found input file(s) {', '.join(input_files)} in path {args.input}") - - if len(input_files) == 0: - print("could not find any input files, we shall stop") - return - - parsed = 0 - top_level = args.name - - # first pass, collect all headers - print(f"parsing {input_files[0]} for metadata") - for topLevelItem in ijson.items(open(input_path / input_files[0]), "item"): - if parsed == args.metadata: - print(f"parsed {parsed} records for metadata") - break - - for (key, value) in topLevelItem.items(): - value_is_dict = isinstance(value, dict) - value_is_list = isinstance(value, list) - - if value_is_dict: - if key not in flat_keys: - flat_keys.append(key) - - for (subKey, subValue) in value.items(): - composite_key = f"{key}_{subKey}" - if composite_key not in headers: - headers.append(composite_key) - elif value_is_list: - existing = extract_keys.get(key, {}) - extract_keys[key] = extract_child(merge_headers, value, key, f"{top_level}_", existing, 1) - else: - if key not in headers: - headers.append(key) - parsed += 1 - - open("schema.json", "w").write( - json.dumps({ - "flat_keys": flat_keys, - "extract_keys": extract_keys - }, indent=2) - ) - dbConn.make_table(top_level, headers) - - for (mhKey, mhVal) in merge_headers.items(): - dbConn.make_table(mhKey, mhVal) - - extract_keys_names = extract_keys.keys() - - # second pass, make flat json from original-json, create csv ( we will use sqlite, as it is faster to write) - for inp_file in input_files: - for topLevelItem in ijson.items(open(input_path / inp_file), "item"): - keys = topLevelItem.keys() - flat_json = {} - for key in keys: - if key in flat_keys: - for (subKey, subValue) in topLevelItem[key].items(): - composite_key = f"{key}_{subKey}" - flat_json[composite_key] = subValue - elif key in extract_keys_names: - ext = extract_keys[key] - extract_child_value(merge_headers, topLevelItem, key, ext, f"{top_level}_", - topLevelItem[args.join_column]) - else: - flat_json[key] = topLevelItem[key] - - dbConn.write_to_database(top_level, flat_json) - - if not args.single: - dbConn.con.commit() - dbConn.make_csv_from_tables(prefix=f"{pathlib.Path(args.output / inp_file).stem}-") - dbConn.reinit_db() - dbConn.make_table(top_level, headers) - - for (mhKey, mhVal) in merge_headers.items(): - dbConn.make_table(mhKey, mhVal) - - if args.single: - dbConn.con.commit() - dbConn.make_csv_from_tables() - - if __name__ == '__main__': - parse_json() + dbConn.parse_json()