-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest.py
117 lines (90 loc) · 3.23 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import pros.django_initializer
from test_app.models import Person, Factoid
from pros_core.setup_app import PROS_MODELS
from icecream import ic
from neomodel import db
fields = {
"birth_event": {
"_": {"label"},
"date": {"earliest_possible", "earliest_possible_conservative"},
"location": {"label"},
},
"death_event": {
"_": {"label"},
"date": {"earliest_possible", "earliest_possible_conservative"},
"location": {"label"},
},
}
def build_select_value_string(select_values, unpack_values):
select_value_string = "{"
select_value_string += ", ".join(f".{v}" for v in select_values)
if unpack_values:
select_value_string += ","
select_value_string += ", ".join(f"{k}: {k}" for k in unpack_values if k != "_")
select_value_string += "}"
return select_value_string
def build_nested_call(fields, current_node="a", key=None, n=0):
# ic(select_values_string)
res = ""
if isinstance(fields, set):
return res
for k1, v1 in fields.items():
select_values = {}
if not isinstance(v1, set):
select_values = v1.get("_")
if isinstance(v1, set):
select_values = v1
if k1 == "_":
pass
if k1 != "_":
res += f"""
CALL {{
WITH {current_node}
OPTIONAL MATCH ({current_node})-[r_{k1}]-(o_{k1})
WHERE type(r_{k1}) = '{k1.upper()}' OR r_{k1}.reverse_name = '{k1.upper()}'
"""
res += build_nested_call(v1, current_node=f"o_{k1}", key=k1, n=n + 1)
if not isinstance(v1, set):
res += f"RETURN COLLECT(o_{k1}{build_select_value_string(select_values, v1)}) AS {k1} }}"
else:
res += f"RETURN o_{k1}{build_select_value_string(select_values, {})} AS {k1} }}"
return res
def get_relations(entity_type):
return {
k: v
for k, v in PROS_MODELS[entity_type].fields.items()
if v["type"] == "relation" and not v["inline_relation"]
}
def get_inline_relations(entity_type):
return {
k.lower(): v
for k, v in PROS_MODELS[entity_type].fields.items()
if v["type"] == "relation" and v["inline_relation"]
}
def build_query(entity_type):
outgoing_relations = get_relations(entity_type)
outgoing_query = ""
return_unpacking_strings = []
for relation_name, relation_dict in outgoing_relations.items():
relation_type = relation_dict["relation_type"]
relation_to = relation_dict["relation_to"]
outgoing_query += f"""
OPTIONAL MATCH (n)-[{relation_name}_rel:{relation_type}]->({relation_name}_node:{relation_to})
"""
return_unpacking_strings.append(f'"{relation_name}", {relation_name}_nodes')
return_unpacking_string = ", ".join(return_unpacking_strings)
return_string = f"RETURN apoc.map.setValues(n, [{return_unpacking_string}])"
q = f"""
MATCH (n:{entity_type.capitalize()} {{uid:"b736fb96ca624d0f8e5268d47e754813"}})
{outgoing_query}
{return_string}
"""
return q
print(build_query("birth"))
"""results, meta = db.cypher_query(
build_query("birth"), {"uid": "b736fb96ca624d0f8e5268d47e754813"}
)
r = PROS_MODELS["birth"].model.inflate(results[0][0])
ic(r)
"""
ic(get_relations("birth"))