-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrunDDL.py
More file actions
161 lines (155 loc) · 4.87 KB
/
runDDL.py
File metadata and controls
161 lines (155 loc) · 4.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import sys
import sqlite3
from sqlite3 import Error
import time
import threading
from urllib.parse import urlparse
import socket
from socket import error as socket_error
"""
function: init()
parameter: None
return: none
This function will get the value for clustercfg and ddlfile
then declare them as global values
"""
def init():
global clustercfg
global ddlfile
clustercfg = sys.argv[1]
ddlfile = sys.argv[2]
"""
function: do_connect()
parameter: (hash) cp, (string) ddlfile
return: none
This function receive the information about the cluster pc, and ddlfile name
Then it will connect to the server PC(s) and send the config + query
for the server to process. It will print out success, if the query is
successful executed, otherwise it will print out failed.
"""
def do_connect(cp, ddlfile):
mySocket = socket.socket()
try:
mySocket.connect((cp['host'], int(cp['port']) ))
#pc type
data_pc_type = "node"
mySocket.send(data_pc_type.encode())
#listen from server
data = mySocket.recv(1024).decode()
#send pc_config data
data = cp['host'] + ':' + cp['port'] + '/' + cp['db']
mySocket.send(data.encode())
#receive signal from server
data = mySocket.recv(1024).decode()
#send ddlfile name to server
mySocket.send(ddlfile.encode())
#receive output from server
data = mySocket.recv(1024).decode()
print (data)
mySocket.close()
except socket_error as e:
print (e)
"""
function: update_catalog_client()
parameter: (hash) cfg, (string) ddlfile
return: none
This function receive the information about the cluster pc, and ddlfile name
Then it will connect to the catalog server PC(s) and send the config + query
for the server to process. It will print out success, if the query is
successful executed, otherwise it will print out failed.
"""
def update_catalog_client(cfg, cfg_data):
#get catalog hostname from cfg string using
#parseUrl (e.g catalog.hostname=172.17.0.2:50001/mycatdb)
cat_cp = parseUrl(cfg['catalog.hostname'])
mySocket = socket.socket()
try:
mySocket.connect((cat_cp['host'], int(cat_cp['port'])))
#pc type
data_pc_type = "catalog"
mySocket.send(data_pc_type.encode())
#listen from server
data_temp = mySocket.recv(1024).decode()
#send pc_config data
data_cp = cat_cp['host'] + ':' + cat_cp['port'] + '/' + cat_cp['db']
mySocket.send(data_cp.encode())
data_temp = mySocket.recv(1024).decode()
#send cfgFile to server
mySocket.send(cfg_data.encode())
data_temp = mySocket.recv(1024).decode()
print (data_temp)
mySocket.close()
except socket_error as e:
print (e)
"""
function: parseUrl()
parameter: (string) hostname
return: (hash) node
This function receives hostname from clustercfg file as a string. Then it will
parse the string into host, port, and databse name that will contains in a node
that will be returned.
"""
def parseUrl(hostname):
node = {}
o = urlparse(hostname)
data = o.path.split('/')
node['host'] = o.scheme
node['port'] = (data[0])
node['db'] = (data[1])
return node
"""
function: parse_config()
parameter: (string) filename
return: hash (options)
This function receive the filename of the clustercfg file.
Then it will parse and store the information into a hash.
Users can retrieve the information by calling the variable
from the cfgfile
"""
def parse_config(filename):
COMMENT_CHAR = '#'
OPTION_CHAR = '='
options = {}
f = open(filename)
for line in f:
# First, remove comments:
if COMMENT_CHAR in line:
# split on comment char, keep only the part before
line, comment = line.split(COMMENT_CHAR, 1)
# Second, find lines with an option=value:
if OPTION_CHAR in line:
# split on option char:
option, value = line.split(OPTION_CHAR, 1)
# strip spaces:
option = option.strip()
value = value.strip()
# store in dictionary:
options[option] = value
f.close()
return options
"""
function: main()
parameter: none
return: none
Main function of the program
"""
def main():
if len(sys.argv) < 3:
print("Error: You didn't enter enough arguments!")
print("Usage: python3 runDDL.py ./cfgfile ./ddlfile")
sys.exit()
else:
init()
cfg = parse_config(clustercfg)
#get numnodes
numnodes = int(cfg['numnodes'])
#loop through all the nodes
for node in range(1, numnodes + 1):
cp = parseUrl(cfg['node%d.hostname' % node])
t = threading.Thread(target=do_connect(cp,ddlfile))
t.start()
t.join()
# updata catalog table
update_catalog_client(cfg, clustercfg)
if __name__ == '__main__':
main()