-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmagic.py
More file actions
156 lines (145 loc) · 5.78 KB
/
magic.py
File metadata and controls
156 lines (145 loc) · 5.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
from IPython.core.magic import Magics, cell_magic, magics_class
from IPython.core.magic_arguments import (
argument, magic_arguments, parse_argstring
)
import time, json
from jsoniq.session import RumbleSession
from py4j.protocol import Py4JJavaError
@magics_class
class JSONiqMagic(Magics):
@magic_arguments()
@argument(
'-t', '--timed', action='store_true', help='Measure execution time.'
)
@argument(
'-df', '--pyspark-data-frame', action='store_true', help='Prints the output as a Pyspark DataFrame (if a schema is available).'
)
@argument(
'-pdf', '--pandas-data-frame', action='store_true', help='Prints the output as a Pandas DataFrame (if a schema is available).'
)
@argument(
'-j', '--json', action='store_true', help='Prints the output as JSON.'
)
@argument(
'-u', '--apply-updates', action='store_true', help='Applies updates if a PUL is output.'
)
def run(self, line, cell=None, timed=False):
if cell is None:
data = line
else:
data = cell
args = parse_argstring(self.run, line)
start = time.time()
try:
rumble = RumbleSession.builder.getOrCreate();
response = rumble.jsoniq(data);
except Py4JJavaError as e:
print(e.java_exception.getMessage())
return
except Exception as e:
print("Query unsuccessful.")
print("Usual reasons: firewall, misconfigured proxy.")
print("Error message:")
print(e.args[0])
return
except:
print("Query unsuccessful.")
print("Usual reasons: firewall, misconfigured proxy.")
return
schema_str = """
No DataFrame available as no schema was detected. If you still believe the output is structured enough, you could add a schema and validate expression explicitly to your query.
This is an example of how you can simply define a schema and wrap your query in a validate expression:
declare type mytype as {
"product" : "string",
"store-number" : "int",
"quantity" : "decimal"
};
validate type mytype* {
for $product in json-lines("http://rumbledb.org/samples/products-small.json", 10)
where $product.quantity ge 995
return $product
}
"""
if(args.pyspark_data_frame):
try:
df = response.df();
except Py4JJavaError as e:
print(e.java_exception.getMessage())
return
except Exception as e:
print("Query unsuccessful.")
print("Usual reasons: firewall, misconfigured proxy.")
print("Error message:")
print(e.args[0])
return
except:
print("Query unsuccessful.")
print("Usual reasons: firewall, misconfigured proxy.")
return
if df is not None:
df.show()
if (args.pandas_data_frame):
try:
pdf = response.pdf()
except Py4JJavaError as e:
print(e.java_exception.getMessage())
return
except Exception as e:
print("Query unsuccessful.")
print("Usual reasons: firewall, misconfigured proxy.")
print("Error message:")
print(e.args[0])
return
except:
print("Query unsuccessful.")
print("Usual reasons: firewall, misconfigured proxy.")
return
if pdf is not None:
print(pdf)
if (args.apply_updates):
if ("PUL" in response.availableOutputs()):
try:
response.applyPUL()
except Py4JJavaError as e:
print(e.java_exception.getMessage())
return
except Exception as e:
print("Query unsuccessful.")
print("Usual reasons: firewall, misconfigured proxy.")
print("Error message:")
print(e.args[0])
return
except:
print("Query unsuccessful.")
print("Usual reasons: firewall, misconfigured proxy.")
return
print("Updates applied successfully.")
else:
print("No Pending Update List (PUL) available to apply.")
if (args.json or (not args.pandas_data_frame and not args.pyspark_data_frame)):
try:
capplusone = response.take(rumble.getRumbleConf().getResultSizeCap() + 1)
except Py4JJavaError as e:
print(e.java_exception.getMessage())
return
except Exception as e:
print("Query unsuccessful.")
print("Usual reasons: firewall, misconfigured proxy.")
print("Error message:")
print(e.args[0])
return
except:
print("Query unsuccessful.")
print("Usual reasons: firewall, misconfigured proxy.")
return
if len(capplusone) > rumble.getRumbleConf().getResultSizeCap():
count = response.count()
print("The query output %s items, which is too many to display. Displaying the first %s items:" % (count, rumble.getRumbleConf().getResultSizeCap()))
for e in capplusone[:rumble.getRumbleConf().getResultSizeCap()]:
print(json.dumps(json.loads(e.serializeAsJSON()), indent=2))
end = time.time()
if(args.timed):
print("Response time: %s ms" % (end - start))
@cell_magic
def jsoniq(self, line, cell=None):
return self.run(line, cell, False)