@@ -663,7 +663,7 @@ def test_join():
663663 df1 = ctx .create_dataframe ([[batch ]], "r" )
664664
665665 df2 = df .join (df1 , on = "a" , how = "inner" )
666- df2 = df2 .sort (column ("l. a" ))
666+ df2 = df2 .sort (column ("a" ))
667667 table = pa .Table .from_batches (df2 .collect ())
668668
669669 expected = {"a" : [1 , 2 ], "c" : [8 , 10 ], "b" : [4 , 5 ]}
@@ -673,16 +673,18 @@ def test_join():
673673 # Since we may have a duplicate column name and pa.Table()
674674 # hides the fact, instead we need to explicitly check the
675675 # resultant arrays.
676- df2 = df .join (df1 , left_on = "a" , right_on = "a" , how = "inner" , drop_duplicate_keys = True )
677- df2 = df2 .sort (column ("l.a" ))
676+ df2 = df .join (
677+ df1 , left_on = "a" , right_on = "a" , how = "inner" , coalesce_duplicate_keys = True
678+ )
679+ df2 = df2 .sort (column ("a" ))
678680 result = df2 .collect ()[0 ]
679681 assert result .num_columns == 3
680682 assert result .column (0 ) == pa .array ([1 , 2 ], pa .int64 ())
681683 assert result .column (1 ) == pa .array ([4 , 5 ], pa .int64 ())
682684 assert result .column (2 ) == pa .array ([8 , 10 ], pa .int64 ())
683685
684686 df2 = df .join (
685- df1 , left_on = "a" , right_on = "a" , how = "inner" , drop_duplicate_keys = False
687+ df1 , left_on = "a" , right_on = "a" , how = "inner" , coalesce_duplicate_keys = False
686688 )
687689 df2 = df2 .sort (column ("l.a" ))
688690 result = df2 .collect ()[0 ]
@@ -695,7 +697,7 @@ def test_join():
695697 # Verify we don't make a breaking change to pre-43.0.0
696698 # where users would pass join_keys as a positional argument
697699 df2 = df .join (df1 , (["a" ], ["a" ]), how = "inner" )
698- df2 = df2 .sort (column ("l. a" ))
700+ df2 = df2 .sort (column ("a" ))
699701 table = pa .Table .from_batches (df2 .collect ())
700702
701703 expected = {"a" : [1 , 2 ], "c" : [8 , 10 ], "b" : [4 , 5 ]}
@@ -720,7 +722,7 @@ def test_join_invalid_params():
720722 with pytest .deprecated_call ():
721723 df2 = df .join (df1 , join_keys = (["a" ], ["a" ]), how = "inner" )
722724 df2 .show ()
723- df2 = df2 .sort (column ("l. a" ))
725+ df2 = df2 .sort (column ("a" ))
724726 table = pa .Table .from_batches (df2 .collect ())
725727
726728 expected = {"a" : [1 , 2 ], "c" : [8 , 10 ], "b" : [4 , 5 ]}
@@ -778,6 +780,35 @@ def test_join_on():
778780 assert table .to_pydict () == expected
779781
780782
783+ def test_join_full_with_drop_duplicate_keys ():
784+ ctx = SessionContext ()
785+
786+ batch = pa .RecordBatch .from_arrays (
787+ [pa .array ([1 , 3 , 5 , 7 , 9 ]), pa .array ([True , True , True , True , True ])],
788+ names = ["log_time" , "key_frame" ],
789+ )
790+ key_frame = ctx .create_dataframe ([[batch ]])
791+
792+ batch = pa .RecordBatch .from_arrays (
793+ [pa .array ([2 , 4 , 6 , 8 , 10 ])],
794+ names = ["log_time" ],
795+ )
796+ query_times = ctx .create_dataframe ([[batch ]])
797+
798+ merged = query_times .join (
799+ key_frame ,
800+ left_on = "log_time" ,
801+ right_on = "log_time" ,
802+ how = "full" ,
803+ coalesce_duplicate_keys = True ,
804+ )
805+ merged = merged .sort (column ("log_time" ))
806+ result = merged .collect ()[0 ]
807+
808+ assert result .num_columns == 2
809+ assert result .column (0 ).to_pylist () == [1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 ]
810+
811+
781812def test_join_on_invalid_expr ():
782813 ctx = SessionContext ()
783814
0 commit comments