Skip to content

ray client is not supported with ray data -> /tutorials/beginner/execution_engine.html only works with local cluster #218

@andnig

Description

@andnig

When running the "ray" example as provided here, the example fails, if a remote cluster is used.

This ray issue seems related: ray-project/ray#41333

Repro-steps:

  1. set up a ray cluster, not on localhost
import os
import pandas as pd
from fugue import transform 

os.environ["RAY_ADDRESS"] = "ray://<ray-cluster>:10001"

df = pd.DataFrame({"col1": [1,2,3,4], "col2": [1,2,3,4]})

# schema: *, col3:int
def add_cols(df:pd.DataFrame) -> pd.DataFrame:
    return df.assign(col3 = df['col1'] + df['col2'])

It would be nice, if this is documented somewhere (or is there a fix for that?)

Error:

_0 _State.RUNNING -> _State.FAILED  Global node is not initialized.
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
Cell In[4], line 3
      1 ray.init(ignore_reinit_error=True)
----> 3 ray_df = transform(df, add_cols, engine="ray")
      4 ray_df.show(5)

File /opt/conda/lib/python3.11/site-packages/fugue/workflow/api.py:174, in transform(df, using, schema, params, partition, callback, ignore_errors, persist, as_local, save_path, checkpoint, engine, engine_conf, as_fugue)
    171     else:
    172         tdf.save(save_path, fmt="parquet")
--> 174 dag.run(make_execution_engine(engine, conf=engine_conf, infer_by=[df]))
    175 if checkpoint:
    176     result = dag.yields["result"].result  # type:ignore

File /opt/conda/lib/python3.11/site-packages/fugue/workflow/workflow.py:1604, in FugueWorkflow.run(self, engine, conf, **kwargs)
   1602             if ctb is None:  # pragma: no cover
   1603                 raise
-> 1604             raise ex.with_traceback(ctb)
   1605         self._computed = True
   1606 return FugueWorkflowResult(self.yields)

File /opt/conda/lib/python3.11/site-packages/fugue_ray/execution_engine.py:240, in RayExecutionEngine.to_df(self, df, schema)
    239 def to_df(self, df: Any, schema: Any = None) -> DataFrame:
--> 240     return self._to_ray_df(df, schema=schema)

File /opt/conda/lib/python3.11/site-packages/fugue_ray/execution_engine.py:329, in RayExecutionEngine._to_ray_df(self, df, schema)
    327 def _to_ray_df(self, df: Any, schema: Any = None) -> RayDataFrame:
    328     # TODO: remove this in phase 2
--> 329     res = self._to_auto_df(df, schema)
    330     if not isinstance(res, RayDataFrame):
    331         return RayDataFrame(res)

File /opt/conda/lib/python3.11/site-packages/fugue_ray/execution_engine.py:342, in RayExecutionEngine._to_auto_df(self, df, schema)
    337     assert_or_throw(
    338         schema is None,
    339         ValueError("schema must be None when df is a DataFrame"),
    340     )
    341     return df
--> 342 return RayDataFrame(df, schema)

File /opt/conda/lib/python3.11/site-packages/fugue_ray/dataframe.py:105, in RayDataFrame.__init__(self, df, schema, internal_schema)
    103 else:
    104     raise ValueError(f"{df} is incompatible with RayDataFrame")
--> 105 rdf, schema = self._apply_schema(rdf, schema, internal_schema)
    106 super().__init__(schema)
    107 self._native = rdf

File /opt/conda/lib/python3.11/site-packages/fugue_ray/dataframe.py:238, in RayDataFrame._apply_schema(self, rdf, schema, internal_schema)
    236 if internal_schema:
    237     return rdf, schema
--> 238 fmt, rdf = get_dataset_format(rdf)
    239 if fmt is None:  # empty
    240     schema = _input_schema(schema).assert_not_empty()

File /opt/conda/lib/python3.11/site-packages/fugue_ray/_utils/dataframe.py:32, in get_dataset_format(df)
     30 def get_dataset_format(df: rd.Dataset) -> Tuple[Optional[str], rd.Dataset]:
     31     df = materialize(df)
---> 32     if df.count() == 0:
     33         return None, df
     34     if ray.__version__ < "2.5.0":  # pragma: no cover

File /opt/conda/lib/python3.11/site-packages/ray/data/dataset.py:2598, in Dataset.count(self)
   2595     return 0
   2597 # For parquet, we can return the count directly from metadata.
-> 2598 meta_count = self._meta_count()
   2599 if meta_count is not None:
   2600     return meta_count

File /opt/conda/lib/python3.11/site-packages/ray/data/dataset.py:5108, in Dataset._meta_count(self)
   5107 def _meta_count(self) -> Optional[int]:
-> 5108     return self._plan.meta_count()

File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/plan.py:496, in ExecutionPlan.meta_count(self)
    491     return None
    492 elif self._in_blocks is not None and self._snapshot_blocks is None:
    493     # If the plan only has input blocks, we execute it, so snapshot has output.
    494     # This applies to newly created dataset. For example, initial dataset
    495     # from read, and output datasets of Dataset.split().
--> 496     self.execute()
    497 # Snapshot is now guaranteed to be the final block or None.
    498 return self._get_num_rows_from_blocks_metadata(self._snapshot_blocks)

File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/plan.py:628, in ExecutionPlan.execute(self, allow_clear_input_blocks, force_read, preserve_order)
    621 metrics_tag = create_dataset_tag(
    622     self._dataset_name, self._dataset_uuid
    623 )
    624 executor = StreamingExecutor(
    625     copy.deepcopy(context.execution_options),
    626     metrics_tag,
    627 )
--> 628 blocks = execute_to_legacy_block_list(
    629     executor,
    630     self,
    631     allow_clear_input_blocks=allow_clear_input_blocks,
    632     dataset_uuid=self._dataset_uuid,
    633     preserve_order=preserve_order,
    634 )
    635 stats = executor.get_stats()
    636 stats_summary_string = stats.to_summary().to_string(
    637     include_parent=False
    638 )

File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/execution/legacy_compat.py:125, in execute_to_legacy_block_list(executor, plan, allow_clear_input_blocks, dataset_uuid, preserve_order)
    107 """Execute a plan with the new executor and translate it into a legacy block list.
    108 
    109 Args:
   (...)
    117     The output as a legacy block list.
    118 """
    119 dag, stats = _get_execution_dag(
    120     executor,
    121     plan,
    122     allow_clear_input_blocks,
    123     preserve_order,
    124 )
--> 125 bundles = executor.execute(dag, initial_stats=stats)
    126 block_list = _bundles_to_block_list(bundles)
    127 # Set the stats UUID after execution finishes.

File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor.py:132, in StreamingExecutor.execute(self, dag, initial_stats)
    129     self._global_info = ProgressBar("Running", dag.num_outputs_total())
    131 self._output_node: OpState = self._topology[dag]
--> 132 StatsManager.register_dataset_to_stats_actor(
    133     self._dataset_tag,
    134     self._get_operator_tags(),
    135 )
    136 self.start()
    137 self._execution_started = True

File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/stats.py:531, in _StatsManager.register_dataset_to_stats_actor(self, dataset_tag, operator_tags)
    530 def register_dataset_to_stats_actor(self, dataset_tag, operator_tags):
--> 531     self._stats_actor().register_dataset.remote(dataset_tag, operator_tags)

File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/stats.py:414, in _StatsManager._stats_actor(self, create_if_not_exists)
    412 def _stats_actor(self, create_if_not_exists=True) -> _StatsActor:
    413     if ray._private.worker._global_node is None:
--> 414         raise RuntimeError("Global node is not initialized.")
    415     current_cluster_id = ray._private.worker._global_node.cluster_id
    416     if (
    417         self._stats_actor_handle is None
    418         or self._stats_actor_cluster_id != current_cluster_id
    419     ):

RuntimeError: Global node is not initialized.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions