update
This commit is contained in:
parent
3365e5f87d
commit
028d6ac265
14
README.md
14
README.md
@ -5,9 +5,15 @@ with support for nested structures
|
||||
|
||||
Steps
|
||||
|
||||
* place the transactions.json file in the same directory
|
||||
* adjust the flat_keys and extract_keys variables in the main.py
|
||||
* install required packages `pip install -r requirements.txt`
|
||||
* run `python3 main.py`
|
||||
* if successful will create a transactions.db (sqlite database)
|
||||
* also create csv
|
||||
* required `--input <dir of input json>`
|
||||
* required `--output <dir of output csvs>`
|
||||
* (optional) to merge all json to single csv `--single`
|
||||
* required `--schema <path to schema.json>`
|
||||
* (optional) # of records to check records for all headers `--metadata <number or records>` defaults to 100
|
||||
* required `--join-column <column name from first level to use as merge column>`
|
||||
* required `--name <base-name of data`
|
||||
* example
|
||||
* `python3 main.py --input /var/tmp/input --output /var/tmp/outputs --schema /var/tmp/schema.json --name transactions --single --metadata 1000 --join-column origin_id`
|
||||
|
||||
|
||||
217
main.py
217
main.py
@ -1,4 +1,6 @@
|
||||
import json
|
||||
import pathlib
|
||||
import time
|
||||
|
||||
import ijson
|
||||
import sqlite3
|
||||
@ -6,45 +8,73 @@ import pandas as pd
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("-i", "--input", help="folder containing input json(s)", required=True, type=pathlib.Path)
|
||||
parser.add_argument("-o", "--output", help="folder to place csv", required=True, type=pathlib.Path)
|
||||
parser.add_argument("-s", "--single", action="store_true", help="merge all json files to single output csv")
|
||||
parser.add_argument("-j", "--join-column", help="join column from top-level to merge nested json", required=True)
|
||||
parser.add_argument("-m", "--metadata", type=int, help="how many records to parse for building metadata", default=100)
|
||||
parser.parse_args()
|
||||
parser.add_argument("--input", help="folder containing input json(s)", required=True, type=pathlib.Path)
|
||||
parser.add_argument("--output", help="folder to place csv", required=True, type=pathlib.Path)
|
||||
parser.add_argument("--schema", help="schema json", required=True, type=pathlib.Path)
|
||||
parser.add_argument("--single", action="store_true", help="merge all json files to single output csv")
|
||||
parser.add_argument("--metadata", type=int, help="how many records to parse for building metadata", default=100)
|
||||
parser.add_argument("--join-column", help="join column from top-level to merge nested json", required=True)
|
||||
parser.add_argument("--name", help="join column from top-level to merge nested json", required=True)
|
||||
|
||||
flat_keys = ["cost_center", "location", "customer"]
|
||||
extract_keys = {
|
||||
"price_modifiers": {
|
||||
"flat_keys": [],
|
||||
"extract_keys": {}
|
||||
},
|
||||
"sale_items": {
|
||||
"flat_keys": [],
|
||||
"extract_keys": {
|
||||
"categories": {
|
||||
"key": "categories",
|
||||
},
|
||||
"modifiers": {
|
||||
"key": "modifiers",
|
||||
"flat_keys": [],
|
||||
"extract_keys": {
|
||||
"categories": {
|
||||
"key": "categories",
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
args = parser.parse_args()
|
||||
|
||||
parent = "transactions"
|
||||
con = sqlite3.connect(f"{parent}.db")
|
||||
cur = con.cursor()
|
||||
schema = json.load(open(args.schema))
|
||||
|
||||
print(f"join cols {args.join_column}")
|
||||
class DBConn:
|
||||
def __init__(self):
|
||||
self.cur = None
|
||||
self.con = None
|
||||
self.counter = 0
|
||||
self.ts = ''
|
||||
self.reinit_db()
|
||||
|
||||
def reinit_db(self):
|
||||
self.counter += 1
|
||||
self.ts = time.strftime('%Y%m%d_%H%M%S', time.localtime())
|
||||
self.con = sqlite3.connect(args.output / f"data-{args.name}-{self.ts}-{self.counter}.db")
|
||||
self.cur = self.con.cursor()
|
||||
|
||||
def make_table(self, tbl_name, cols):
|
||||
syspk = "syspk integer primary key autoincrement"
|
||||
other_cols = ', '.join([f"{f} TEXT" for f in cols])
|
||||
create_tbl_sql = f"create table if not exists {tbl_name} ({syspk}, {other_cols})"
|
||||
self.cur.execute(create_tbl_sql)
|
||||
|
||||
def write_to_database(self, tbl, cols):
|
||||
keys = cols.keys() # OrderedDict
|
||||
|
||||
col_names = ', '.join(
|
||||
[x for x in keys]
|
||||
)
|
||||
value_placeholders = ', '.join(
|
||||
["?" for _ in keys]
|
||||
)
|
||||
# trim values of spaces
|
||||
values = tuple([str(cols[k]).strip() for k in keys])
|
||||
|
||||
sql = f"insert into {tbl} ({col_names}) values({value_placeholders})"
|
||||
|
||||
self.cur.execute(sql, values)
|
||||
|
||||
def make_csv_from_tables(self, prefix=''):
|
||||
dbConn.cur.execute("SELECT name FROM sqlite_master WHERE type='table'")
|
||||
tbls = []
|
||||
|
||||
for (tbl,) in dbConn.cur.fetchall():
|
||||
if tbl.find("transactions") == 0:
|
||||
tbls.append(tbl)
|
||||
|
||||
for tbl in tbls:
|
||||
clients = pd.read_sql(f"SELECT * FROM {tbl}", self.con)
|
||||
clients.to_csv(args.output / f"{prefix}{tbl}.csv", index=False)
|
||||
|
||||
|
||||
dbConn = DBConn()
|
||||
|
||||
|
||||
def extract_child(merge_headers, item, k, ext, prev_step):
|
||||
child_headers = merge_headers.get(k, [])
|
||||
child_headers = merge_headers.get(k, [args.join_column])
|
||||
|
||||
for child in item[k]:
|
||||
for (subKey, subValue) in child.items():
|
||||
@ -59,73 +89,47 @@ def extract_child(merge_headers, item, k, ext, prev_step):
|
||||
merge_headers[f"{prev_step}{k}"] = child_headers
|
||||
|
||||
|
||||
global_counter = 1
|
||||
|
||||
|
||||
def extract_child_value(merge_headers, item, k, ext, prev_step):
|
||||
def extract_child_value(merge_headers, item, k, ext, prev_step, join_col_value):
|
||||
child_value = {}
|
||||
child_value[args.join_column] = join_col_value
|
||||
for child in item[k]:
|
||||
for (subKey, subValue) in child.items():
|
||||
is_ext = subKey in ext.get("extract_keys", {}).keys()
|
||||
if is_ext:
|
||||
extract_child_value(merge_headers, child, subKey, ext["extract_keys"][subKey], f"{prev_step}{k}_")
|
||||
extract_child_value(merge_headers, child, subKey, ext["extract_keys"][subKey], f"{prev_step}{k}_",
|
||||
join_col_value)
|
||||
else:
|
||||
child_header = f"{k}_{subKey}"
|
||||
child_value[child_header] = subValue
|
||||
|
||||
k_ = f"{prev_step}{k}"
|
||||
write_to_database(k_, child_value)
|
||||
|
||||
|
||||
def make_table(tbl_name, cols):
|
||||
syspk = "syspk integer primary key autoincrement"
|
||||
other_cols = ', '.join([f"{f} TEXT" for f in cols])
|
||||
create_tbl_sql = f"create table if not exists {tbl_name} ({syspk}, {other_cols})"
|
||||
# print(f"{tbl_name} = ", cols)
|
||||
# print(f"{tbl_name} = ", create_tbl_sql)
|
||||
cur.execute(create_tbl_sql)
|
||||
|
||||
|
||||
def write_to_database(tbl, cols):
|
||||
keys = cols.keys()
|
||||
|
||||
col_names = ', '.join(
|
||||
[x for x in keys]
|
||||
)
|
||||
value_placeholders = ', '.join(
|
||||
["?" for x in keys]
|
||||
)
|
||||
|
||||
values = tuple([str(cols[k]).strip() for k in keys])
|
||||
|
||||
sql = f"insert into {tbl} ({col_names}) values({value_placeholders})"
|
||||
|
||||
# print(f"execute {sql} with values {values}")
|
||||
|
||||
cur.execute(sql, values)
|
||||
|
||||
|
||||
def make_csv_from_tables():
|
||||
cur.execute("SELECT name FROM sqlite_master WHERE type='table'")
|
||||
tbls = []
|
||||
|
||||
for (tbl,) in cur.fetchall():
|
||||
# print("tbl = ", tbl)
|
||||
if tbl.find("transactions") == 0:
|
||||
tbls.append(tbl)
|
||||
|
||||
for tbl in tbls:
|
||||
clients = pd.read_sql(f"SELECT * FROM {tbl}", con)
|
||||
clients.to_csv(f"{tbl}.csv", index=False)
|
||||
dbConn.write_to_database(k_, child_value)
|
||||
|
||||
|
||||
def parse_json():
|
||||
extract_keys = schema["extract_keys"]
|
||||
flat_keys = schema["flat_keys"]
|
||||
|
||||
extract_keys_names = extract_keys.keys()
|
||||
headers = []
|
||||
merge_headers = {}
|
||||
|
||||
input_path = args.input
|
||||
|
||||
input_files = [x.name for x in input_path.iterdir() if not x.is_dir() and x.is_file() and x.name.endswith(".json")]
|
||||
print(f"found input file(s) {', '.join(input_files)} in path {args.input}")
|
||||
|
||||
if len(input_files) == 0:
|
||||
print("could not find any input files, we shall stop")
|
||||
return
|
||||
|
||||
parsed = 0
|
||||
parent = args.name
|
||||
# first pass, collect all headers
|
||||
for item in ijson.items(open("transactions.json"), "item"):
|
||||
for item in ijson.items(open(input_path / input_files[0]), "item"):
|
||||
if parsed > args.metadata:
|
||||
print(f"parsed {parsed} records for metadata")
|
||||
break
|
||||
keys = item.keys()
|
||||
for k in keys:
|
||||
if k in flat_keys:
|
||||
@ -140,32 +144,43 @@ def parse_json():
|
||||
else:
|
||||
if k not in headers:
|
||||
headers.append(k)
|
||||
parsed += 1
|
||||
|
||||
make_table(parent, headers)
|
||||
dbConn.make_table(parent, headers)
|
||||
|
||||
for (mhKey, mhVal) in merge_headers.items():
|
||||
make_table(mhKey, mhVal)
|
||||
dbConn.make_table(mhKey, mhVal)
|
||||
|
||||
# second pass, make flat json from original-json, create csv ( we will use sqlite, as it is faster to write)
|
||||
for inp_file in input_files:
|
||||
for item in ijson.items(open(input_path / inp_file), "item"):
|
||||
keys = item.keys()
|
||||
flat_json = {}
|
||||
for k in keys:
|
||||
if k in flat_keys:
|
||||
for (fk, fv) in item[k].items():
|
||||
composite_key = f"{k}_{fk}"
|
||||
flat_json[composite_key] = fv
|
||||
elif k in extract_keys_names:
|
||||
ext = extract_keys[k]
|
||||
extract_child_value(merge_headers, item, k, ext, f"{parent}_", item[args.join_column])
|
||||
else:
|
||||
flat_json[k] = item[k]
|
||||
|
||||
for item in ijson.items(open("transactions.json"), "item"):
|
||||
keys = item.keys()
|
||||
flat_json = {}
|
||||
for k in keys:
|
||||
if k in flat_keys:
|
||||
for (fk, fv) in item[k].items():
|
||||
composite_key = f"{k}_{fk}"
|
||||
flat_json[composite_key] = fv
|
||||
elif k in extract_keys_names:
|
||||
ext = extract_keys[k]
|
||||
extract_child_value(merge_headers, item, k, ext, f"{parent}_")
|
||||
else:
|
||||
flat_json[k] = item[k]
|
||||
dbConn.write_to_database(parent, flat_json)
|
||||
|
||||
write_to_database(parent, flat_json)
|
||||
con.commit()
|
||||
if not args.single:
|
||||
dbConn.con.commit()
|
||||
dbConn.make_csv_from_tables(prefix=f"{pathlib.Path(args.output / inp_file).stem}-")
|
||||
dbConn.reinit_db()
|
||||
dbConn.make_table(parent, headers)
|
||||
|
||||
make_csv_from_tables()
|
||||
for (mhKey, mhVal) in merge_headers.items():
|
||||
dbConn.make_table(mhKey, mhVal)
|
||||
|
||||
if args.single:
|
||||
dbConn.con.commit()
|
||||
dbConn.make_csv_from_tables()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user