From 63507beacd195d0c00d0632c85054333becf029e Mon Sep 17 00:00:00 2001 From: Roland Walker Date: Sat, 14 Mar 2026 10:39:46 -0400 Subject: [PATCH] add collation completions; more charset completion Complete in these positions: * "string" COLLATE ^ * CONVERT("string" USING ^ * CAST("string" as CHAR CHARACTER SET ^ Refactor stored charsets to not be per-schema, which was not necessary. Add commentary on the pre-existing issue of the WHERE logic short- circuiting other useful completions, and some other edge cases such as overenthusiastic blocking of numeric completions. --- changelog.md | 2 +- mycli/completion_refresher.py | 5 ++ mycli/packages/completion_engine.py | 68 ++++++++++++++---- mycli/sqlcompleter.py | 42 ++++++++--- mycli/sqlexecute.py | 16 +++++ test/test_completion_engine.py | 40 +++++++---- test/test_completion_refresher.py | 3 +- ...est_smart_completion_public_schema_only.py | 70 +++++++++++++++++++ 8 files changed, 210 insertions(+), 36 deletions(-) diff --git a/changelog.md b/changelog.md index 1b63d851..afef337e 100644 --- a/changelog.md +++ b/changelog.md @@ -4,6 +4,7 @@ Upcoming (TBD) Features --------- * Add prompt format string for literal backslash. +* Add collation completions, and complete charsets in more positions. Bug Fixes @@ -11,7 +12,6 @@ Bug Fixes * Suppress warnings when `sqlglotrs` is installed. - 1.64.0 (2026/03/13) ============== diff --git a/mycli/completion_refresher.py b/mycli/completion_refresher.py index f34c5b89..38b547b2 100644 --- a/mycli/completion_refresher.py +++ b/mycli/completion_refresher.py @@ -165,6 +165,11 @@ def refresh_character_sets(completer: SQLCompleter, executor: SQLExecute) -> Non completer.extend_character_sets(executor.character_sets()) +@refresher("collations") +def refresh_collations(completer: SQLCompleter, executor: SQLExecute) -> None: + completer.extend_collations(executor.collations()) + + @refresher("special_commands") def refresh_special(completer: SQLCompleter, executor: SQLExecute) -> None: completer.extend_special_commands(list(COMMANDS.keys())) diff --git a/mycli/packages/completion_engine.py b/mycli/packages/completion_engine.py index c8b3d40e..c03a3326 100644 --- a/mycli/packages/completion_engine.py +++ b/mycli/packages/completion_engine.py @@ -39,6 +39,23 @@ def _enum_value_suggestion(text_before_cursor: str, full_text: str) -> dict[str, } +def _charset_suggestion(tokens: list[Token]) -> list[dict[str, str]] | None: + token_values = [token.value.lower() for token in tokens if token.value] + + if len(token_values) >= 2 and token_values[-1] == 'set' and token_values[-2] == 'character': + return [{'type': 'character_set'}] + if len(token_values) >= 3 and token_values[-2] == 'set' and token_values[-3] == 'character': + return [{'type': 'character_set'}] + if len(token_values) >= 5 and token_values[-1] == 'using' and token_values[-4] == 'convert': + return [{'type': 'character_set'}] + if len(token_values) >= 6 and token_values[-2] == 'using' and token_values[-5] == 'convert': + return [{'type': 'character_set'}] + if len(token_values) >= 1 and token_values[-1] == 'collate': + return [{'type': 'collation'}] + + return None + + def _is_where_or_having(token: Token | None) -> bool: return bool(token and token.value and token.value.lower() in ("where", "having")) @@ -261,6 +278,7 @@ def suggest_based_on_last_token( # don't suggest anything inside a string or number if word_before_cursor: + # todo: example where this fails: completing on COLLATE with string "0900" if re.match(r'^[\d\.]', word_before_cursor[0]): return [] # more efficient if no space was typed yet in the string @@ -272,6 +290,14 @@ def suggest_based_on_last_token( if is_inside_quotes(text_before_cursor, -1) in ['single', 'double']: return [] + try: + # todo: pass in the complete list of tokens to avoid multiple parsing passes + parsed = sqlparse.parse(text_before_cursor)[0] + tokens_wo_space = [x for x in parsed.tokens if x.ttype != sqlparse.tokens.Token.Text.Whitespace] + except (AttributeError, IndexError, ValueError, sqlparse.exceptions.SQLParseError): + parsed = sqlparse.sql.Statement() + tokens_wo_space = [] + if isinstance(token, str): token_v = token.lower() elif isinstance(token, Comparison): @@ -286,7 +312,15 @@ def suggest_based_on_last_token( # sqlparse groups all tokens from the where clause into a single token # list. This means that token.value may be something like # 'where foo > 5 and '. We need to look "inside" token.tokens to handle - # suggestions in complicated where clauses correctly + # suggestions in complicated where clauses correctly. + # + # This logic also needs to look even deeper in to the WHERE clause. + # We recapitulate some transcoding suggestions here, but cannot + # recapitulate the entire logic of this function. + where_tokens = [x for x in token.tokens if x.ttype != sqlparse.tokens.Token.Text.Whitespace] + if transcoding_suggestion := _charset_suggestion(where_tokens): + return transcoding_suggestion + original_text = text_before_cursor prev_keyword, text_before_cursor = find_prev_keyword(text_before_cursor) enum_suggestion = _enum_value_suggestion(original_text, full_text) @@ -303,12 +337,12 @@ def suggest_based_on_last_token( if not token: return [{"type": "keyword"}, {"type": "special"}] - elif token_v == "*": + + if token_v == "*": return [{"type": "keyword"}] - elif token_v.endswith("("): - p = sqlparse.parse(text_before_cursor)[0] - if p.tokens and isinstance(p.tokens[-1], Where): + if token_v.endswith("("): + if parsed.tokens and isinstance(parsed.tokens[-1], Where): # Four possibilities: # 1 - Parenthesized clause like "WHERE foo AND (" # Suggest columns/functions @@ -323,7 +357,7 @@ def suggest_based_on_last_token( column_suggestions = suggest_based_on_last_token("where", text_before_cursor, None, full_text, identifier) # Check for a subquery expression (cases 3 & 4) - where = p.tokens[-1] + where = parsed.tokens[-1] _idx, prev_tok = where.token_prev(len(where.tokens) - 1) if isinstance(prev_tok, Comparison): @@ -337,25 +371,29 @@ def suggest_based_on_last_token( return column_suggestions # Get the token before the parens - idx, prev_tok = p.token_prev(len(p.tokens) - 1) + idx, prev_tok = parsed.token_prev(len(parsed.tokens) - 1) if prev_tok and prev_tok.value and prev_tok.value.lower() == "using": # tbl1 INNER JOIN tbl2 USING (col1, col2) tables = extract_tables(full_text) # suggest columns that are present in more than one table return [{"type": "column", "tables": tables, "drop_unique": True}] - elif p.token_first().value.lower() == "select": + elif parsed.tokens and parsed.token_first().value.lower() == "select": # If the lparen is preceeded by a space chances are we're about to # do a sub-select. if last_word(text_before_cursor, "all_punctuations").startswith("("): return [{"type": "keyword"}] - elif p.token_first().value.lower() == "show": + elif parsed.tokens and parsed.token_first().value.lower() == "show": return [{"type": "show"}] # We're probably in a function argument list return [{"type": "column", "tables": extract_tables(full_text)}] elif token_v in ("call"): return [{"type": "procedure", "schema": []}] + elif token_v in ('set') and len(tokens_wo_space) >= 3 and tokens_wo_space[-3].value.lower() == 'character': + return [{'type': 'character_set'}] + elif token_v in ('set') and len(tokens_wo_space) >= 2 and tokens_wo_space[-2].value.lower() == 'character': + return [{'type': 'character_set'}] elif token_v in ("set", "order by", "distinct"): return [{"type": "column", "tables": extract_tables(full_text)}] elif token_v == "as": @@ -364,13 +402,19 @@ def suggest_based_on_last_token( elif token_v in ("show"): return [{"type": "show"}] elif token_v in ("to",): - p = sqlparse.parse(text_before_cursor)[0] - if p.token_first().value.lower() == "change": + if parsed.tokens and parsed.token_first().value.lower() == "change": return [{"type": "change"}] else: return [{"type": "user"}] elif token_v in ("user", "for"): return [{"type": "user"}] + elif token_v in ('collate'): + return [{'type': 'collation'}] + # some duplication with _charset_suggestion() + elif token_v in ('using') and len(tokens_wo_space) >= 5 and tokens_wo_space[-5].value.lower() == 'convert': + return [{'type': 'character_set'}] + elif token_v in ('using') and len(tokens_wo_space) >= 4 and tokens_wo_space[-4].value.lower() == 'convert': + return [{'type': 'character_set'}] elif token_v in ("select", "where", "having"): # Check for a table alias or schema qualification parent = (identifier and identifier.get_parent_name()) or [] @@ -399,7 +443,7 @@ def suggest_based_on_last_token( return [ {"type": "column", "tables": tables}, {"type": "function", "schema": []}, - {"type": "introducer", "schema": []}, + {"type": "introducer"}, {"type": "alias", "aliases": aliases}, ] elif ( diff --git a/mycli/sqlcompleter.py b/mycli/sqlcompleter.py index 112effae..ba897398 100644 --- a/mycli/sqlcompleter.py +++ b/mycli/sqlcompleter.py @@ -927,6 +927,10 @@ class SQLCompleter(Completer): users: list[str] = [] + character_sets: list[str] = [] + + collations: list[str] = [] + def __init__( self, smart_completion: bool = True, @@ -1087,16 +1091,22 @@ def extend_procedures(self, procedure_data: Generator[tuple]) -> None: metadata[self.dbname][elt[0]] = None def extend_character_sets(self, character_set_data: Generator[tuple]) -> None: - metadata = self.dbmetadata["character_sets"] - if self.dbname not in metadata: - metadata[self.dbname] = {} - for elt in character_set_data: if not elt: continue if not elt[0]: continue - metadata[self.dbname][elt[0]] = None + self.character_sets.append(elt[0]) + self.all_completions.update(elt[0]) + + def extend_collations(self, collation_data: Generator[tuple]) -> None: + for elt in collation_data: + if not elt: + continue + if not elt[0]: + continue + self.collations.append(elt[0]) + self.all_completions.update(elt[0]) def set_dbname(self, dbname: str | None) -> None: self.dbname = dbname or '' @@ -1104,6 +1114,8 @@ def set_dbname(self, dbname: str | None) -> None: def reset_completions(self) -> None: self.databases: list[str] = [] self.users: list[str] = [] + self.character_sets: list[str] = [] + self.collations: list[str] = [] self.show_items: list[Completion] = [] self.dbname = "" self.dbmetadata: dict[str, Any] = { @@ -1111,7 +1123,6 @@ def reset_completions(self) -> None: "views": {}, "functions": {}, "procedures": {}, - "character_sets": {}, "enum_values": {}, } self.all_completions = set(self.keywords + self.functions) @@ -1321,8 +1332,7 @@ def get_completions( completions.extend([(*x, rank) for x in procs_m]) elif suggestion['type'] == 'introducer': - charsets = self.populate_schema_objects(suggestion['schema'], 'character_sets') - introducers = [f'_{x}' for x in charsets] + introducers = [f'_{x}' for x in self.character_sets] introducers_m = self.find_matches( word_before_cursor, introducers, @@ -1330,6 +1340,22 @@ def get_completions( ) completions.extend([(*x, rank) for x in introducers_m]) + elif suggestion['type'] == 'character_set': + charsets_m = self.find_matches( + word_before_cursor, + self.character_sets, + text_before_cursor=document.text_before_cursor, + ) + completions.extend([(*x, rank) for x in charsets_m]) + + elif suggestion['type'] == 'collation': + collations_m = self.find_matches( + word_before_cursor, + self.collations, + text_before_cursor=document.text_before_cursor, + ) + completions.extend([(*x, rank) for x in collations_m]) + elif suggestion["type"] == "table": # If this is a select and columns are given, parse the columns and # then only return tables that have one or more of the given columns. diff --git a/mycli/sqlexecute.py b/mycli/sqlexecute.py index 18c5e689..16b0f04d 100644 --- a/mycli/sqlexecute.py +++ b/mycli/sqlexecute.py @@ -105,6 +105,8 @@ class SQLExecute: character_sets_query = '''SHOW CHARACTER SET''' + collations_query = '''SHOW COLLATION''' + table_columns_query = """select TABLE_NAME, COLUMN_NAME from information_schema.columns where table_schema = %s order by table_name,ordinal_position""" @@ -482,6 +484,20 @@ def character_sets(self) -> Generator[tuple, None, None]: else: yield from cur + def collations(self) -> Generator[tuple, None, None]: + """Yields tuples of (collation_name, )""" + + assert isinstance(self.conn, Connection) + with self.conn.cursor() as cur: + _logger.debug("Collations Query. sql: %r", self.collations_query) + try: + cur.execute(self.collations_query) + except pymysql.DatabaseError as e: + _logger.error('No collations completions due to %r', e) + yield () + else: + yield from cur + def show_candidates(self) -> Generator[tuple, None, None]: assert isinstance(self.conn, Connection) with self.conn.cursor() as cur: diff --git a/test/test_completion_engine.py b/test/test_completion_engine.py index 0d62e65a..6c33649b 100644 --- a/test/test_completion_engine.py +++ b/test/test_completion_engine.py @@ -21,7 +21,7 @@ def test_select_suggests_cols_with_visible_table_scope(): {"type": "alias", "aliases": ["tabl"]}, {"type": "column", "tables": [(None, "tabl", None)]}, {"type": "function", "schema": []}, - {"type": "introducer", "schema": []}, + {"type": "introducer"}, ]) @@ -31,7 +31,7 @@ def test_select_suggests_cols_with_qualified_table_scope(): {"type": "alias", "aliases": ["tabl"]}, {"type": "column", "tables": [("sch", "tabl", None)]}, {"type": "function", "schema": []}, - {"type": "introducer", "schema": []}, + {"type": "introducer"}, ]) @@ -55,7 +55,7 @@ def test_where_suggests_columns_functions(expression): {"type": "alias", "aliases": ["tabl"]}, {"type": "column", "tables": [(None, "tabl", None)]}, {"type": "function", "schema": []}, - {"type": "introducer", "schema": []}, + {"type": "introducer"}, ]) @@ -67,7 +67,7 @@ def test_where_equals_suggests_enum_values_first(): {"type": "alias", "aliases": ["tabl"]}, {"type": "column", "tables": [(None, "tabl", None)]}, {"type": "function", "schema": []}, - {"type": "introducer", "schema": []}, + {"type": "introducer"}, ]) @@ -84,7 +84,7 @@ def test_where_in_suggests_columns(expression): {"type": "alias", "aliases": ["tabl"]}, {"type": "column", "tables": [(None, "tabl", None)]}, {"type": "function", "schema": []}, - {"type": "introducer", "schema": []}, + {"type": "introducer"}, ]) @@ -95,10 +95,22 @@ def test_where_equals_any_suggests_columns_or_keywords(): {"type": "alias", "aliases": ["tabl"]}, {"type": "column", "tables": [(None, "tabl", None)]}, {"type": "function", "schema": []}, - {"type": "introducer", "schema": []}, + {"type": "introducer"}, ]) +def test_where_convert_using_suggests_character_set(): + text = 'SELECT * FROM tabl WHERE CONVERT(foo USING ' + suggestions = suggest_type(text, text) + assert suggestions == [{"type": "character_set"}] + + +def test_where_cast_character_set_suggests_character_set(): + text = 'SELECT * FROM tabl WHERE CAST(foo AS CHAR CHARACTER SET ' + suggestions = suggest_type(text, text) + assert suggestions == [{"type": "character_set"}] + + def test_lparen_suggests_cols(): suggestion = suggest_type("SELECT MAX( FROM tbl", "SELECT MAX(") assert suggestion == [{"type": "column", "tables": [(None, "tbl", None)]}] @@ -120,7 +132,7 @@ def test_select_suggests_cols_and_funcs(): {"type": "alias", "aliases": []}, {"type": "column", "tables": []}, {"type": "function", "schema": []}, - {"type": "introducer", "schema": []}, + {"type": "introducer"}, ]) @@ -193,7 +205,7 @@ def test_col_comma_suggests_cols(): {"type": "alias", "aliases": ["tbl"]}, {"type": "column", "tables": [(None, "tbl", None)]}, {"type": "function", "schema": []}, - {"type": "introducer", "schema": []}, + {"type": "introducer"}, ]) @@ -236,7 +248,7 @@ def test_partially_typed_col_name_suggests_col_names(): {"type": "alias", "aliases": ["tabl"]}, {"type": "column", "tables": [(None, "tabl", None)]}, {"type": "function", "schema": []}, - {"type": "introducer", "schema": []}, + {"type": "introducer"}, ]) @@ -331,7 +343,7 @@ def test_sub_select_col_name_completion(): {"type": "alias", "aliases": ["abc"]}, {"type": "column", "tables": [(None, "abc", None)]}, {"type": "function", "schema": []}, - {"type": "introducer", "schema": []}, + {"type": "introducer"}, ]) @@ -341,7 +353,7 @@ def test_sub_select_multiple_col_name_completion(): assert sorted_dicts(suggestions) == sorted_dicts([ {"type": "column", "tables": [(None, "abc", None)]}, {"type": "function", "schema": []}, - {"type": "introducer", "schema": []}, + {"type": "introducer"}, ]) @@ -485,7 +497,7 @@ def test_2_statements_2nd_current(): {"type": "alias", "aliases": ["b"]}, {"type": "column", "tables": [(None, "b", None)]}, {"type": "function", "schema": []}, - {"type": "introducer", "schema": []}, + {"type": "introducer"}, ]) # Should work even if first statement is invalid @@ -510,7 +522,7 @@ def test_2_statements_1st_current(): {"type": "alias", "aliases": ["a"]}, {"type": "column", "tables": [(None, "a", None)]}, {"type": "function", "schema": []}, - {"type": "introducer", "schema": []}, + {"type": "introducer"}, ]) @@ -527,7 +539,7 @@ def test_3_statements_2nd_current(): {"type": "alias", "aliases": ["b"]}, {"type": "column", "tables": [(None, "b", None)]}, {"type": "function", "schema": []}, - {"type": "introducer", "schema": []}, + {"type": "introducer"}, ]) diff --git a/test/test_completion_refresher.py b/test/test_completion_refresher.py index fbf5e88a..e7ed35b2 100644 --- a/test/test_completion_refresher.py +++ b/test/test_completion_refresher.py @@ -30,7 +30,8 @@ def test_ctor(refresher): "users", "functions", "procedures", - "character_sets", + 'character_sets', + 'collations', "special_commands", "show_commands", "keywords", diff --git a/test/test_smart_completion_public_schema_only.py b/test/test_smart_completion_public_schema_only.py index 6a9db9ba..bf4e729f 100644 --- a/test/test_smart_completion_public_schema_only.py +++ b/test/test_smart_completion_public_schema_only.py @@ -135,6 +135,76 @@ def test_introducer_completion(completer, complete_event): assert '_utf8mb4' in result_text +def test_collation_completion(completer, complete_event): + completer.extend_collations([('utf16le_bin',), ('utf8mb4_unicode_ci',)]) + text = 'SELECT "text" COLLATE ' + position = len(text) + result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event)) + result_text = [item.text for item in result] + assert 'utf16le_bin' in result_text + assert 'utf8mb4_unicode_ci' in result_text + + +def test_transcoding_completion_1(completer, complete_event): + completer.extend_character_sets([('latin1',), ('utf8mb4',)]) + text = 'SELECT CONVERT("text" USING ' + position = len(text) + result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event)) + result_text = [item.text for item in result] + assert 'latin1' in result_text + assert 'utf8mb4' in result_text + + +def test_transcoding_completion_2(completer, complete_event): + completer.extend_character_sets([('utf8mb3',), ('utf8mb4',)]) + text = 'SELECT CONVERT("text" USING u' + position = len(text) + result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event)) + result_text = [item.text for item in result] + assert 'utf8mb3' in result_text + assert 'utf8mb4' in result_text + + +def test_transcoding_completion_3(completer, complete_event): + completer.extend_character_sets([('latin1',), ('utf8mb4',)]) + text = 'SELECT CAST("text" AS CHAR CHARACTER SET ' + position = len(text) + result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event)) + result_text = [item.text for item in result] + assert 'latin1' in result_text + assert 'utf8mb4' in result_text + + +def test_transcoding_completion_4(completer, complete_event): + completer.extend_character_sets([('utf8mb3',), ('utf8mb4',)]) + text = 'SELECT CAST("text" AS CHAR CHARACTER SET u' + position = len(text) + result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event)) + result_text = [item.text for item in result] + assert 'utf8mb3' in result_text + assert 'utf8mb4' in result_text + + +def test_where_transcoding_completion_1(completer, complete_event): + completer.extend_character_sets([('latin1',), ('utf8mb4',)]) + text = 'SELECT * FROM users WHERE CONVERT(email USING ' + position = len(text) + result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event)) + result_text = [item.text for item in result] + assert 'latin1' in result_text + assert 'utf8mb4' in result_text + + +def test_where_transcoding_completion_2(completer, complete_event): + completer.extend_character_sets([('latin1',), ('utf8mb4',)]) + text = 'SELECT * FROM users WHERE CAST(email AS CHAR CHARACTER SET ' + position = len(text) + result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event)) + result_text = [item.text for item in result] + assert 'latin1' in result_text + assert 'utf8mb4' in result_text + + def test_table_completion(completer, complete_event): text = "SELECT * FROM " position = len(text)