Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ Upcoming (TBD)
Features
---------
* Add prompt format string for literal backslash.
* Add collation completions, and complete charsets in more positions.


Bug Fixes
---------
* Suppress warnings when `sqlglotrs` is installed.



1.64.0 (2026/03/13)
==============

Expand Down
5 changes: 5 additions & 0 deletions mycli/completion_refresher.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ def refresh_character_sets(completer: SQLCompleter, executor: SQLExecute) -> Non
completer.extend_character_sets(executor.character_sets())


@refresher("collations")
def refresh_collations(completer: SQLCompleter, executor: SQLExecute) -> None:
completer.extend_collations(executor.collations())


@refresher("special_commands")
def refresh_special(completer: SQLCompleter, executor: SQLExecute) -> None:
completer.extend_special_commands(list(COMMANDS.keys()))
Expand Down
68 changes: 56 additions & 12 deletions mycli/packages/completion_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,23 @@ def _enum_value_suggestion(text_before_cursor: str, full_text: str) -> dict[str,
}


def _charset_suggestion(tokens: list[Token]) -> list[dict[str, str]] | None:
token_values = [token.value.lower() for token in tokens if token.value]

if len(token_values) >= 2 and token_values[-1] == 'set' and token_values[-2] == 'character':
return [{'type': 'character_set'}]
if len(token_values) >= 3 and token_values[-2] == 'set' and token_values[-3] == 'character':
return [{'type': 'character_set'}]
if len(token_values) >= 5 and token_values[-1] == 'using' and token_values[-4] == 'convert':
return [{'type': 'character_set'}]
if len(token_values) >= 6 and token_values[-2] == 'using' and token_values[-5] == 'convert':
return [{'type': 'character_set'}]
if len(token_values) >= 1 and token_values[-1] == 'collate':
return [{'type': 'collation'}]

return None


def _is_where_or_having(token: Token | None) -> bool:
return bool(token and token.value and token.value.lower() in ("where", "having"))

Expand Down Expand Up @@ -261,6 +278,7 @@ def suggest_based_on_last_token(

# don't suggest anything inside a string or number
if word_before_cursor:
# todo: example where this fails: completing on COLLATE with string "0900"
if re.match(r'^[\d\.]', word_before_cursor[0]):
return []
# more efficient if no space was typed yet in the string
Expand All @@ -272,6 +290,14 @@ def suggest_based_on_last_token(
if is_inside_quotes(text_before_cursor, -1) in ['single', 'double']:
return []

try:
# todo: pass in the complete list of tokens to avoid multiple parsing passes
parsed = sqlparse.parse(text_before_cursor)[0]
tokens_wo_space = [x for x in parsed.tokens if x.ttype != sqlparse.tokens.Token.Text.Whitespace]
except (AttributeError, IndexError, ValueError, sqlparse.exceptions.SQLParseError):
parsed = sqlparse.sql.Statement()
tokens_wo_space = []

if isinstance(token, str):
token_v = token.lower()
elif isinstance(token, Comparison):
Expand All @@ -286,7 +312,15 @@ def suggest_based_on_last_token(
# sqlparse groups all tokens from the where clause into a single token
# list. This means that token.value may be something like
# 'where foo > 5 and '. We need to look "inside" token.tokens to handle
# suggestions in complicated where clauses correctly
# suggestions in complicated where clauses correctly.
#
# This logic also needs to look even deeper in to the WHERE clause.
# We recapitulate some transcoding suggestions here, but cannot
# recapitulate the entire logic of this function.
where_tokens = [x for x in token.tokens if x.ttype != sqlparse.tokens.Token.Text.Whitespace]
if transcoding_suggestion := _charset_suggestion(where_tokens):
return transcoding_suggestion

original_text = text_before_cursor
prev_keyword, text_before_cursor = find_prev_keyword(text_before_cursor)
enum_suggestion = _enum_value_suggestion(original_text, full_text)
Expand All @@ -303,12 +337,12 @@ def suggest_based_on_last_token(

if not token:
return [{"type": "keyword"}, {"type": "special"}]
elif token_v == "*":

if token_v == "*":
return [{"type": "keyword"}]
elif token_v.endswith("("):
p = sqlparse.parse(text_before_cursor)[0]

if p.tokens and isinstance(p.tokens[-1], Where):
if token_v.endswith("("):
if parsed.tokens and isinstance(parsed.tokens[-1], Where):
# Four possibilities:
# 1 - Parenthesized clause like "WHERE foo AND ("
# Suggest columns/functions
Expand All @@ -323,7 +357,7 @@ def suggest_based_on_last_token(
column_suggestions = suggest_based_on_last_token("where", text_before_cursor, None, full_text, identifier)

# Check for a subquery expression (cases 3 & 4)
where = p.tokens[-1]
where = parsed.tokens[-1]
_idx, prev_tok = where.token_prev(len(where.tokens) - 1)

if isinstance(prev_tok, Comparison):
Expand All @@ -337,25 +371,29 @@ def suggest_based_on_last_token(
return column_suggestions

# Get the token before the parens
idx, prev_tok = p.token_prev(len(p.tokens) - 1)
idx, prev_tok = parsed.token_prev(len(parsed.tokens) - 1)
if prev_tok and prev_tok.value and prev_tok.value.lower() == "using":
# tbl1 INNER JOIN tbl2 USING (col1, col2)
tables = extract_tables(full_text)

# suggest columns that are present in more than one table
return [{"type": "column", "tables": tables, "drop_unique": True}]
elif p.token_first().value.lower() == "select":
elif parsed.tokens and parsed.token_first().value.lower() == "select":
# If the lparen is preceeded by a space chances are we're about to
# do a sub-select.
if last_word(text_before_cursor, "all_punctuations").startswith("("):
return [{"type": "keyword"}]
elif p.token_first().value.lower() == "show":
elif parsed.tokens and parsed.token_first().value.lower() == "show":
return [{"type": "show"}]

# We're probably in a function argument list
return [{"type": "column", "tables": extract_tables(full_text)}]
elif token_v in ("call"):
return [{"type": "procedure", "schema": []}]
elif token_v in ('set') and len(tokens_wo_space) >= 3 and tokens_wo_space[-3].value.lower() == 'character':
return [{'type': 'character_set'}]
elif token_v in ('set') and len(tokens_wo_space) >= 2 and tokens_wo_space[-2].value.lower() == 'character':
return [{'type': 'character_set'}]
elif token_v in ("set", "order by", "distinct"):
return [{"type": "column", "tables": extract_tables(full_text)}]
elif token_v == "as":
Expand All @@ -364,13 +402,19 @@ def suggest_based_on_last_token(
elif token_v in ("show"):
return [{"type": "show"}]
elif token_v in ("to",):
p = sqlparse.parse(text_before_cursor)[0]
if p.token_first().value.lower() == "change":
if parsed.tokens and parsed.token_first().value.lower() == "change":
return [{"type": "change"}]
else:
return [{"type": "user"}]
elif token_v in ("user", "for"):
return [{"type": "user"}]
elif token_v in ('collate'):
return [{'type': 'collation'}]
# some duplication with _charset_suggestion()
elif token_v in ('using') and len(tokens_wo_space) >= 5 and tokens_wo_space[-5].value.lower() == 'convert':
return [{'type': 'character_set'}]
elif token_v in ('using') and len(tokens_wo_space) >= 4 and tokens_wo_space[-4].value.lower() == 'convert':
return [{'type': 'character_set'}]
elif token_v in ("select", "where", "having"):
# Check for a table alias or schema qualification
parent = (identifier and identifier.get_parent_name()) or []
Expand Down Expand Up @@ -399,7 +443,7 @@ def suggest_based_on_last_token(
return [
{"type": "column", "tables": tables},
{"type": "function", "schema": []},
{"type": "introducer", "schema": []},
{"type": "introducer"},
{"type": "alias", "aliases": aliases},
]
elif (
Expand Down
42 changes: 34 additions & 8 deletions mycli/sqlcompleter.py
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,10 @@ class SQLCompleter(Completer):

users: list[str] = []

character_sets: list[str] = []

collations: list[str] = []

def __init__(
self,
smart_completion: bool = True,
Expand Down Expand Up @@ -1087,31 +1091,38 @@ def extend_procedures(self, procedure_data: Generator[tuple]) -> None:
metadata[self.dbname][elt[0]] = None

def extend_character_sets(self, character_set_data: Generator[tuple]) -> None:
metadata = self.dbmetadata["character_sets"]
if self.dbname not in metadata:
metadata[self.dbname] = {}

for elt in character_set_data:
if not elt:
continue
if not elt[0]:
continue
metadata[self.dbname][elt[0]] = None
self.character_sets.append(elt[0])
self.all_completions.update(elt[0])

def extend_collations(self, collation_data: Generator[tuple]) -> None:
for elt in collation_data:
if not elt:
continue
if not elt[0]:
continue
self.collations.append(elt[0])
self.all_completions.update(elt[0])

def set_dbname(self, dbname: str | None) -> None:
self.dbname = dbname or ''

def reset_completions(self) -> None:
self.databases: list[str] = []
self.users: list[str] = []
self.character_sets: list[str] = []
self.collations: list[str] = []
self.show_items: list[Completion] = []
self.dbname = ""
self.dbmetadata: dict[str, Any] = {
"tables": {},
"views": {},
"functions": {},
"procedures": {},
"character_sets": {},
"enum_values": {},
}
self.all_completions = set(self.keywords + self.functions)
Expand Down Expand Up @@ -1321,15 +1332,30 @@ def get_completions(
completions.extend([(*x, rank) for x in procs_m])

elif suggestion['type'] == 'introducer':
charsets = self.populate_schema_objects(suggestion['schema'], 'character_sets')
introducers = [f'_{x}' for x in charsets]
introducers = [f'_{x}' for x in self.character_sets]
introducers_m = self.find_matches(
word_before_cursor,
introducers,
text_before_cursor=document.text_before_cursor,
)
completions.extend([(*x, rank) for x in introducers_m])

elif suggestion['type'] == 'character_set':
charsets_m = self.find_matches(
word_before_cursor,
self.character_sets,
text_before_cursor=document.text_before_cursor,
)
completions.extend([(*x, rank) for x in charsets_m])

elif suggestion['type'] == 'collation':
collations_m = self.find_matches(
word_before_cursor,
self.collations,
text_before_cursor=document.text_before_cursor,
)
completions.extend([(*x, rank) for x in collations_m])

elif suggestion["type"] == "table":
# If this is a select and columns are given, parse the columns and
# then only return tables that have one or more of the given columns.
Expand Down
16 changes: 16 additions & 0 deletions mycli/sqlexecute.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ class SQLExecute:

character_sets_query = '''SHOW CHARACTER SET'''

collations_query = '''SHOW COLLATION'''

table_columns_query = """select TABLE_NAME, COLUMN_NAME from information_schema.columns
where table_schema = %s
order by table_name,ordinal_position"""
Expand Down Expand Up @@ -482,6 +484,20 @@ def character_sets(self) -> Generator[tuple, None, None]:
else:
yield from cur

def collations(self) -> Generator[tuple, None, None]:
"""Yields tuples of (collation_name, )"""

assert isinstance(self.conn, Connection)
with self.conn.cursor() as cur:
_logger.debug("Collations Query. sql: %r", self.collations_query)
try:
cur.execute(self.collations_query)
except pymysql.DatabaseError as e:
_logger.error('No collations completions due to %r', e)
yield ()
else:
yield from cur

def show_candidates(self) -> Generator[tuple, None, None]:
assert isinstance(self.conn, Connection)
with self.conn.cursor() as cur:
Expand Down
Loading
Loading