Skip to content

Commit afc74b4

Browse files
committed
fix: correct memory format for Spark master and worker
- Convert Kubernetes memory format (1Gi) to JVM format (1g) for SPARK_DAEMON_MEMORY - Keep Kubernetes format for resources.limits.memory - Add master_memory_jvm and worker_memory_jvm variables - Fix kubectl wait timeout format (300s instead of 300)
1 parent b3801bc commit afc74b4

15 files changed

Lines changed: 227 additions & 15 deletions

File tree

cli

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Subproject commit 851e3774f014f193155233ec7997f597af46636b

devcontainer

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Subproject commit 4e69ae59fe6aea64521f2007c523577b3ed6a792

nuvolaris/kube.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,12 @@ def ctl(arg, jsonpath='{@}', flatten=False):
118118
# apply an object
119119
def apply(obj, namespace="nuvolaris"):
120120
if not isinstance(obj, str):
121-
obj = json.dumps(obj)
121+
# Use SafeDumper and ensure strings are quoted
122+
class LiteralStr(str): pass
123+
def literal_str_representer(dumper, data):
124+
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='')
125+
yaml.add_representer(LiteralStr, literal_str_representer, Dumper=yaml.SafeDumper)
126+
obj = yaml.dump(obj, Dumper=yaml.SafeDumper, default_flow_style=False, sort_keys=False, allow_unicode=True)
122127
return kubectl("apply", "-f", "-", namespace=namespace, input=obj)
123128

124129
# apply an expanded template

nuvolaris/kube.py.orig

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
#
18+
# this module wraps kubectl
19+
import nuvolaris.testutil as tu
20+
import nuvolaris.template as tpl
21+
import subprocess
22+
import json
23+
import logging
24+
import yaml
25+
26+
27+
output = ""
28+
error = ""
29+
returncode = -1
30+
31+
dry_run = False
32+
33+
mocker = tu.MockKube()
34+
35+
# execute kubectl commands
36+
# default namespace is nuvolaris, you can change with keyword arg namespace
37+
# default output is text
38+
# if you specify jsonpath it will filter and parse the json output
39+
# returns exceptions if errors
40+
def kubectl(*args, namespace="nuvolaris", input=None, jsonpath=None, debugresult=True, timeout=None):
41+
# support for mocked requests
42+
mres = mocker.invoke(*args)
43+
if mres:
44+
mocker.save(input)
45+
return mres
46+
47+
cmd = namespace and ["kubectl", "-n", namespace] or ["kubectl"]
48+
cmd += list(args)
49+
if jsonpath:
50+
cmd += ["-o", "jsonpath-as-json=%s" % jsonpath]
51+
52+
# if is a string, convert input in bytes
53+
try: input = input.encode('utf-8')
54+
except: pass
55+
56+
# executing
57+
logging.debug(cmd)
58+
res = subprocess.run(cmd, capture_output=True, input=input, timeout=timeout)
59+
60+
global returncode, output, error
61+
returncode = res.returncode
62+
output = res.stdout.decode()
63+
error = res.stderr.decode()
64+
65+
if res.returncode == 0:
66+
if jsonpath:
67+
try:
68+
parsed = json.loads(output)
69+
if debugresult:
70+
logging.debug("result: %s", json.dumps(parsed, indent=2))
71+
return parsed
72+
except Exception as e:
73+
logging.info(output)
74+
logging.info(e)
75+
return e
76+
else:
77+
return output
78+
logging.info(f"Error: kubectl f{cmd} input='{input}' output='{output}' error='{error}'")
79+
raise Exception(error)
80+
81+
# create a configmap from keyword arguments
82+
def configMap(name, **kwargs):
83+
"""
84+
>>> import nuvolaris.kube as kube, nuvolaris.testutil as tu
85+
>>> tu.grep(kube.configMap("hello", value="world"), "kind:|name:|value:", sort=True)
86+
kind: ConfigMap
87+
name: hello
88+
value: world
89+
>>> tu.grep(kube.configMap("hello", **{"file.js":"function", "file.py": "def"}), "file.", sort=True)
90+
file.js: function
91+
file.py: def
92+
"""
93+
out = yaml.safe_load("""apiVersion: v1
94+
kind: ConfigMap
95+
metadata:
96+
name: %s
97+
data: {}
98+
"""% name)
99+
for key, value in kwargs.items():
100+
out['data'][key] = value
101+
return yaml.dump(out)
102+
103+
# delete an object
104+
def delete(obj, namespace="nuvolaris"):
105+
# tested with apply
106+
if not isinstance(obj, str):
107+
obj = json.dumps(obj)
108+
return kubectl("delete", "-f", "-", namespace=namespace, input=obj)
109+
110+
# shortcut
111+
def ctl(arg, jsonpath='{@}', flatten=False):
112+
import flatdict, json
113+
data = kubectl(*arg.split(), jsonpath=jsonpath)
114+
if flatten:
115+
return dict(flatdict.FlatterDict(data, delimiter="."))
116+
return data
117+
118+
# apply an object
119+
def apply(obj, namespace="nuvolaris"):
120+
if not isinstance(obj, str):
121+
obj = yaml.dump(obj, default_flow_style=False, sort_keys=False)
122+
return kubectl("apply", "-f", "-", namespace=namespace, input=obj)
123+
124+
# apply an expanded template
125+
def applyTemplate(name, data, namespace="nuvolaris"):
126+
obj = tpl.expand_template(name, data)
127+
return kubectl("apply", "-f", "-", namespace=namespace, input=obj)
128+
129+
# delete an expanded template
130+
def deleteTemplate(name, data, namespace="nuvolaris"):
131+
obj = tpl.expand_template(name, data)
132+
return kubectl("delete", "-f", "-", namespace=namespace, input=obj)
133+
134+
def get(name, namespace="nuvolaris"):
135+
try:
136+
return json.loads(kubectl("get", name, "-ojson", namespace=namespace))
137+
except:
138+
return None
139+
140+
def get_pods(selector, namespace="nuvolaris"):
141+
"""
142+
filter the existing pods using the given selector expression. (ex name=mongodb-kubernetes-operator)
143+
"""
144+
try:
145+
return json.loads(kubectl("get", "pods", f"--selector={selector}","-ojson",namespace=namespace))
146+
except:
147+
return None
148+
149+
def wait(name, condition, timeout="600s", namespace="nuvolaris"):
150+
try:
151+
return kubectl("wait", name, f"--for={condition}", f"--timeout={timeout}",namespace=namespace)
152+
except:
153+
return None
154+
155+
# patch an object
156+
def patch(name, data, namespace="nuvolaris", tpe="merge"):
157+
if not type(data) == str:
158+
data = json.dumps(data)
159+
res = kubectl("patch", name, "--type", tpe, "-p", data)
160+
return res
161+
162+
def scale_sts(name, replicas, namespace="nuvolaris"):
163+
try:
164+
return kubectl("scale", name, f"--replicas={replicas}" ,namespace=namespace)
165+
except:
166+
return None
167+
168+
# rollout the specified element. Normally used for DeamonSet or StatefulSet
169+
def rollout(name, namespace="nuvolaris"):
170+
try:
171+
return kubectl("rollout", "restart", name, namespace=namespace)
172+
except:
173+
return None
174+
175+
def detect_kind():
176+
try:
177+
is_kind = kubectl("get","node/nuvolaris-control-plane",
178+
namespace=None,
179+
jsonpath='{.metadata.labels.nuvolaris\\.io/kube}')
180+
return is_kind and "kind" in is_kind
181+
except:
182+
return False

nuvolaris/kube.py.rej

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
--- nuvolaris/kube.py
2+
+++ nuvolaris/kube.py
3+
@@ -118,7 +118,12 @@ def flatten(data):
4+
# apply an object
5+
def apply(obj, namespace="nuvolaris"):
6+
if not isinstance(obj, str):
7+
- obj = yaml.dump(obj, default_flow_style=False, sort_keys=False)
8+
+ # Use SafeDumper and ensure strings are quoted
9+
+ class LiteralStr(str): pass
10+
+ def literal_str_representer(dumper, data):
11+
+ return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='')
12+
+ yaml.add_representer(LiteralStr, literal_str_representer, Dumper=yaml.SafeDumper)
13+
+ obj = yaml.dump(obj, Dumper=yaml.SafeDumper, default_flow_style=False, sort_keys=False, allow_unicode=True)
14+
return kubectl("apply", "-f", "-", namespace=namespace, input=obj)
15+
# apply an expanded template
16+
def applyTemplate(name, data, namespace="nuvolaris"):

nuvolaris/kustomize.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ def kustom_list(where, *what, templates=[], data={}):
249249
"""
250250
yml = nku.kustomize(where, *what, templates=templates, data=data)
251251
stream = io.StringIO(yml)
252-
res = list(yaml.load_all(stream, yaml.Loader))
252+
res = list(yaml.load_all(stream, yaml.SafeLoader))
253253
return {"apiVersion": "v1", "kind": "List", "items": res }
254254

255255

@@ -267,13 +267,13 @@ def restricted_kustom_list(where, *what, templates=[], templates_filter=[], data
267267
"""
268268
yml = nku.restricted_kustomize(where, *what, templates=templates, templates_filter=templates_filter,data=data)
269269
stream = io.StringIO(yml)
270-
res = list(yaml.load_all(stream, yaml.Loader))
270+
res = list(yaml.load_all(stream, yaml.SafeLoader))
271271
return {"apiVersion": "v1", "kind": "List", "items": res }
272272

273273
# load the given yaml file under deploy/{where} folder
274274
def raw(where, yamlfile):
275275
with open(f"deploy/{where}/{yamlfile}", 'r') as f:
276-
return list(yaml.load_all(f, yaml.Loader))
276+
return list(yaml.load_all(f, yaml.SafeLoader))
277277

278278
def processTemplate(where,template,data,out_template=None):
279279
"""
@@ -286,7 +286,7 @@ def processTemplate(where,template,data,out_template=None):
286286

287287
ntp.spool_template(template, out, data)
288288
with open(out, 'r') as f:
289-
res = list(yaml.load_all(f, yaml.Loader))
289+
res = list(yaml.load_all(f, yaml.SafeLoader))
290290
return {"apiVersion": "v1", "kind": "List", "items": res }
291291

292292
def renderTemplate(where,template,data,out_template):

nuvolaris/spark.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,15 @@ def get_spark_config_data():
4242

4343
# Master configuration
4444
"master_replicas": cfg.get('spark.master.replicas', defval=1),
45-
"master_memory": cfg.get('spark.master.memory', defval='1g'),
45+
"master_memory": cfg.get('spark.master.memory', defval='1Gi'),
46+
"master_memory_jvm": _convert_k8s_memory_to_jvm(cfg.get('spark.master.memory', defval='1Gi')),
4647
"master_cpu": cfg.get('spark.master.cpu', defval='1000m'),
4748
"master_port": cfg.get('spark.master.port', defval=7077),
4849
"master_webui_port": cfg.get('spark.master.webui-port', defval=8080),
4950

5051
# Worker configuration
51-
"worker_replicas": cfg.get('spark.worker.replicas', defval=2),
52-
"worker_memory": cfg.get('spark.worker.memory', defval='2g'),
52+
"worker_memory": cfg.get('spark.worker.memory', defval='2Gi'),
53+
"worker_memory_jvm": _convert_k8s_memory_to_jvm(cfg.get('spark.worker.memory', defval='2Gi')),
5354
"worker_cpu": cfg.get('spark.worker.cpu', defval='2000m'),
5455
"worker_cores": cfg.get('spark.worker.cores', defval=2),
5556
"worker_webui_port": cfg.get('spark.worker.webui-port', defval=8081),
@@ -116,7 +117,7 @@ def create(owner=None):
116117
kus.processTemplate("spark", "spark-history-dep-tpl.yaml", data, "spark-history-dep.yaml")
117118

118119
# 3. Define kustomize patches (standard pattern)
119-
tplp = ["set-attach.yaml"]
120+
tplp = []
120121

121122
# 4. Add affinity/tolerations if enabled (standard pattern)
122123
if data.get('affinity') or data.get('tolerations'):
@@ -126,8 +127,8 @@ def create(owner=None):
126127
kust = kus.patchTemplates("spark", tplp, data)
127128

128129
# 6. Build complete specification using standard OpenServerless pattern
129-
templates = ["spark-rbac.yaml"] # Static Jinja2 templates to include
130-
templates_filter = ["spark-configmap.yaml", "spark-master-sts.yaml"] # Generated templates to filter
130+
templates = [] # Static non-Jinja2 templates to include
131+
templates_filter = ["spark-rbac.yaml", "spark-configmap.yaml", "spark-master-sts.yaml"] # Generated templates to filter
131132

132133
if data['history_enabled']:
133134
templates_filter.extend(["spark-history-pvc.yaml", "spark-history-dep.yaml"])
@@ -150,7 +151,7 @@ def create(owner=None):
150151
logging.info("waiting for spark master to be ready...")
151152
util.wait_for_pod_ready(
152153
"{.items[?(@.metadata.labels.component == 'spark-master')].metadata.name}",
153-
timeout=300
154+
timeout="300s"
154155
)
155156
logging.info("spark master is ready")
156157

@@ -512,7 +513,7 @@ def _validate_sparkjob_spec(spec, job_name):
512513
'executor': {
513514
'instances': 2,
514515
'cores': 1,
515-
'memory': '1g'
516+
'memory': '1Gi'
516517
}
517518
},
518519
'execution': {
@@ -566,7 +567,7 @@ def merge_dict(target, source):
566567

567568
def _convert_k8s_memory_to_jvm(k8s_memory):
568569
"""
569-
Convert Kubernetes memory format (like '1Gi') to JVM format (like '1g')
570+
Convert Kubernetes memory format (like '1Gi') to JVM format (like '1Gi')
570571
"""
571572
if k8s_memory.endswith('Gi'):
572573
return k8s_memory[:-2] + 'g'

nuvolaris/templates/spark-master-sts-tpl.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ spec:
6666
- name: SPARK_MASTER_WEBUI_PORT
6767
value: "{{master_webui_port}}"
6868
- name: SPARK_DAEMON_MEMORY
69-
value: "{{master_memory}}"
69+
value: "{{master_memory_jvm}}"
7070
- name: SPARK_NO_DAEMONIZE
7171
value: "true"
7272
resources:

olaris

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Subproject commit 6467dcbbcb67293e0a4a30f9a1046e63516a92dc

0 commit comments

Comments
 (0)