Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 60 additions & 26 deletions influxdb_client_3/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import importlib.util
import json
import os
import urllib.parse
from typing import Any, List, Literal, Optional, TYPE_CHECKING

import pyarrow as pa

from influxdb_client_3.version import USER_AGENT
from influxdb_client_3.write_client._sync import rest_client as rest

if TYPE_CHECKING:
import pandas as pd
import polars as pl
Expand All @@ -14,7 +18,7 @@
from influxdb_client_3.exceptions import InfluxDBError
from influxdb_client_3.query.query_api import QueryApi as _QueryApi, QueryApiOptionsBuilder
from influxdb_client_3.read_file import UploadFile
from influxdb_client_3.write_client import InfluxDBClient as _InfluxDBClient, WriteOptions, Point
from influxdb_client_3.write_client import WriteOptions, Point
from influxdb_client_3.write_client.client.write_api import WriteApi as _WriteApi, SYNCHRONOUS, ASYNCHRONOUS, \
PointSettings, DefaultWriteOptions, WriteType
from influxdb_client_3.write_client.domain.write_precision import WritePrecision
Expand Down Expand Up @@ -185,10 +189,13 @@ def _parse_timeout(to: str) -> int:
class InfluxDBClient3:
def __init__(
self,
host=None,
host='localhost',
org=None,
database=None,
token=None,
auth_scheme=None,
enable_gzip=False,
gzip_threshold=None,
write_client_options=None,
flight_client_options=None,
write_port_overwrite=None,
Expand All @@ -212,6 +219,10 @@ def __init__(
:type flight_client_options: dict[str, any]
:param disable_grpc_compression: Disable gRPC compression for Flight query responses. Default is False.
:type disable_grpc_compression: bool
:param enable_gzip: Enable GZIP compression for write requests.
:type enable_gzip: bool
:param gzip_threshold: Minimum payload size (bytes) to trigger GZIP when enable_gzip is True.
:type gzip_threshold: int
:key auth_scheme: token authentication scheme. Set to "Bearer" for Edge.
:key bool verify_ssl: Set this to false to skip verifying SSL certificate when calling API from https server.
:key str ssl_ca_cert: Set this to customize the certificate file to verify the peer.
Expand Down Expand Up @@ -293,14 +304,45 @@ def __init__(
if write_port_overwrite is not None:
port = write_port_overwrite

self._client = _InfluxDBClient(
url=f"{scheme}://{hostname}:{port}",
# TODO fix retries
retries = None

auth_schema = 'Token' if auth_scheme is None else auth_scheme
default_header = {
'User-Agent': USER_AGENT
}
if self._token is not None:
default_header['Authorization'] = f'{auth_schema} {self._token}'
self.base_url = f"{scheme}://{hostname}:{port}"
self.default_header = default_header
self.rest_client = rest.RestClient(
base_url=self.base_url,
default_header=default_header,
verify_ssl=kwargs.get('verify_ssl', True),
ssl_ca_cert=kwargs.get('ssl_ca_cert', None),
cert_file=kwargs.get('cert_file', None),
cert_key_file=kwargs.get('cert_key_file', None),
cert_key_password=kwargs.get('cert_key_password', None),
ssl_context=kwargs.get('ssl_context', None),
proxy=kwargs.get('proxy', None),
proxy_headers=kwargs.get('proxy_headers', None),
retries=retries,
)

# TODO point_settings??

self._write_api = _WriteApi(
token=self._token,
bucket=self._database,
org=self._org,
gzip_threshold=gzip_threshold,
enable_gzip=enable_gzip,
auth_scheme=auth_scheme,
timeout=write_timeout,
**kwargs)

self._write_api = _WriteApi(influxdb_client=self._client, **self._write_client_options)
default_header=default_header,
rest_client=self.rest_client,
**self._write_client_options
)

if query_port_overwrite is not None:
port = query_port_overwrite
Expand Down Expand Up @@ -658,32 +700,25 @@ async def query_async(self, query: str, language: str = "sql", mode: str = "all"
except ArrowException as e:
raise InfluxDB3ClientQueryError(f"Error while executing query: {e}")

def get_server_version(self) -> str:
def get_server_version(self) -> Optional[str]:
"""
Get the version of the connected InfluxDB server.
Get the influxdb_version of the connected InfluxDB server.

This method makes a ping request to the server and extracts the version information
This method makes a ping request to the server and extracts the influxdb_version information
from either the response headers or response body.

:return: The version string of the InfluxDB server.
:return: The influxdb_version string of the InfluxDB server.
:rtype: str
"""
version = None
(resp_body, _, header) = self._client.api_client.call_api(
resource_path="/ping",
method="GET",
response_type=object
)

for key, value in header.items():
resp = self.rest_client.request(url='/ping', method="GET", headers=self.default_header)
for key, value in resp.getheaders().items():
if key.lower() == "x-influxdb-version":
version = value
break

if version is None and isinstance(resp_body, dict):
version = resp_body['version']
return value

return version
string_body = resp.get_string_body()
if string_body is not None:
return json.loads(string_body)['version']
return None

def flush(self):
"""
Expand All @@ -702,7 +737,6 @@ def close(self):
"""Close the client and clean up resources."""
self._write_api.close()
self._query_api.close()
self._client.close()

def __enter__(self):
return self
Expand Down
10 changes: 2 additions & 8 deletions influxdb_client_3/write_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,9 @@

from __future__ import absolute_import

from influxdb_client_3.write_client.client.write_api import WriteApi, WriteOptions
from influxdb_client_3.write_client.client.influxdb_client import InfluxDBClient
from influxdb_client_3.write_client.client.logging_handler import InfluxLoggingHandler
from influxdb_client_3.version import VERSION
from influxdb_client_3.write_client.client.write.point import Point

from influxdb_client_3.write_client.service.write_service import WriteService

from influxdb_client_3.write_client.client.write_api import WriteApi, WriteOptions
from influxdb_client_3.write_client.domain.write_precision import WritePrecision

from influxdb_client_3.write_client.configuration import Configuration
from influxdb_client_3.version import VERSION
__version__ = VERSION
Loading
Loading