-
Notifications
You must be signed in to change notification settings - Fork 19
ray client is not supported with ray data -> /tutorials/beginner/execution_engine.html only works with local cluster #218
Copy link
Copy link
Open
Description
When running the "ray" example as provided here, the example fails, if a remote cluster is used.
This ray issue seems related: ray-project/ray#41333
Repro-steps:
- set up a ray cluster, not on localhost
import os
import pandas as pd
from fugue import transform
os.environ["RAY_ADDRESS"] = "ray://<ray-cluster>:10001"
df = pd.DataFrame({"col1": [1,2,3,4], "col2": [1,2,3,4]})
# schema: *, col3:int
def add_cols(df:pd.DataFrame) -> pd.DataFrame:
return df.assign(col3 = df['col1'] + df['col2'])It would be nice, if this is documented somewhere (or is there a fix for that?)
Error:
_0 _State.RUNNING -> _State.FAILED Global node is not initialized.
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
Cell In[4], line 3
1 ray.init(ignore_reinit_error=True)
----> 3 ray_df = transform(df, add_cols, engine="ray")
4 ray_df.show(5)
File /opt/conda/lib/python3.11/site-packages/fugue/workflow/api.py:174, in transform(df, using, schema, params, partition, callback, ignore_errors, persist, as_local, save_path, checkpoint, engine, engine_conf, as_fugue)
171 else:
172 tdf.save(save_path, fmt="parquet")
--> 174 dag.run(make_execution_engine(engine, conf=engine_conf, infer_by=[df]))
175 if checkpoint:
176 result = dag.yields["result"].result # type:ignore
File /opt/conda/lib/python3.11/site-packages/fugue/workflow/workflow.py:1604, in FugueWorkflow.run(self, engine, conf, **kwargs)
1602 if ctb is None: # pragma: no cover
1603 raise
-> 1604 raise ex.with_traceback(ctb)
1605 self._computed = True
1606 return FugueWorkflowResult(self.yields)
File /opt/conda/lib/python3.11/site-packages/fugue_ray/execution_engine.py:240, in RayExecutionEngine.to_df(self, df, schema)
239 def to_df(self, df: Any, schema: Any = None) -> DataFrame:
--> 240 return self._to_ray_df(df, schema=schema)
File /opt/conda/lib/python3.11/site-packages/fugue_ray/execution_engine.py:329, in RayExecutionEngine._to_ray_df(self, df, schema)
327 def _to_ray_df(self, df: Any, schema: Any = None) -> RayDataFrame:
328 # TODO: remove this in phase 2
--> 329 res = self._to_auto_df(df, schema)
330 if not isinstance(res, RayDataFrame):
331 return RayDataFrame(res)
File /opt/conda/lib/python3.11/site-packages/fugue_ray/execution_engine.py:342, in RayExecutionEngine._to_auto_df(self, df, schema)
337 assert_or_throw(
338 schema is None,
339 ValueError("schema must be None when df is a DataFrame"),
340 )
341 return df
--> 342 return RayDataFrame(df, schema)
File /opt/conda/lib/python3.11/site-packages/fugue_ray/dataframe.py:105, in RayDataFrame.__init__(self, df, schema, internal_schema)
103 else:
104 raise ValueError(f"{df} is incompatible with RayDataFrame")
--> 105 rdf, schema = self._apply_schema(rdf, schema, internal_schema)
106 super().__init__(schema)
107 self._native = rdf
File /opt/conda/lib/python3.11/site-packages/fugue_ray/dataframe.py:238, in RayDataFrame._apply_schema(self, rdf, schema, internal_schema)
236 if internal_schema:
237 return rdf, schema
--> 238 fmt, rdf = get_dataset_format(rdf)
239 if fmt is None: # empty
240 schema = _input_schema(schema).assert_not_empty()
File /opt/conda/lib/python3.11/site-packages/fugue_ray/_utils/dataframe.py:32, in get_dataset_format(df)
30 def get_dataset_format(df: rd.Dataset) -> Tuple[Optional[str], rd.Dataset]:
31 df = materialize(df)
---> 32 if df.count() == 0:
33 return None, df
34 if ray.__version__ < "2.5.0": # pragma: no cover
File /opt/conda/lib/python3.11/site-packages/ray/data/dataset.py:2598, in Dataset.count(self)
2595 return 0
2597 # For parquet, we can return the count directly from metadata.
-> 2598 meta_count = self._meta_count()
2599 if meta_count is not None:
2600 return meta_count
File /opt/conda/lib/python3.11/site-packages/ray/data/dataset.py:5108, in Dataset._meta_count(self)
5107 def _meta_count(self) -> Optional[int]:
-> 5108 return self._plan.meta_count()
File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/plan.py:496, in ExecutionPlan.meta_count(self)
491 return None
492 elif self._in_blocks is not None and self._snapshot_blocks is None:
493 # If the plan only has input blocks, we execute it, so snapshot has output.
494 # This applies to newly created dataset. For example, initial dataset
495 # from read, and output datasets of Dataset.split().
--> 496 self.execute()
497 # Snapshot is now guaranteed to be the final block or None.
498 return self._get_num_rows_from_blocks_metadata(self._snapshot_blocks)
File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/plan.py:628, in ExecutionPlan.execute(self, allow_clear_input_blocks, force_read, preserve_order)
621 metrics_tag = create_dataset_tag(
622 self._dataset_name, self._dataset_uuid
623 )
624 executor = StreamingExecutor(
625 copy.deepcopy(context.execution_options),
626 metrics_tag,
627 )
--> 628 blocks = execute_to_legacy_block_list(
629 executor,
630 self,
631 allow_clear_input_blocks=allow_clear_input_blocks,
632 dataset_uuid=self._dataset_uuid,
633 preserve_order=preserve_order,
634 )
635 stats = executor.get_stats()
636 stats_summary_string = stats.to_summary().to_string(
637 include_parent=False
638 )
File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/execution/legacy_compat.py:125, in execute_to_legacy_block_list(executor, plan, allow_clear_input_blocks, dataset_uuid, preserve_order)
107 """Execute a plan with the new executor and translate it into a legacy block list.
108
109 Args:
(...)
117 The output as a legacy block list.
118 """
119 dag, stats = _get_execution_dag(
120 executor,
121 plan,
122 allow_clear_input_blocks,
123 preserve_order,
124 )
--> 125 bundles = executor.execute(dag, initial_stats=stats)
126 block_list = _bundles_to_block_list(bundles)
127 # Set the stats UUID after execution finishes.
File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor.py:132, in StreamingExecutor.execute(self, dag, initial_stats)
129 self._global_info = ProgressBar("Running", dag.num_outputs_total())
131 self._output_node: OpState = self._topology[dag]
--> 132 StatsManager.register_dataset_to_stats_actor(
133 self._dataset_tag,
134 self._get_operator_tags(),
135 )
136 self.start()
137 self._execution_started = True
File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/stats.py:531, in _StatsManager.register_dataset_to_stats_actor(self, dataset_tag, operator_tags)
530 def register_dataset_to_stats_actor(self, dataset_tag, operator_tags):
--> 531 self._stats_actor().register_dataset.remote(dataset_tag, operator_tags)
File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/stats.py:414, in _StatsManager._stats_actor(self, create_if_not_exists)
412 def _stats_actor(self, create_if_not_exists=True) -> _StatsActor:
413 if ray._private.worker._global_node is None:
--> 414 raise RuntimeError("Global node is not initialized.")
415 current_cluster_id = ray._private.worker._global_node.cluster_id
416 if (
417 self._stats_actor_handle is None
418 or self._stats_actor_cluster_id != current_cluster_id
419 ):
RuntimeError: Global node is not initialized.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels