From b1501b0204f0198de4efc0043e69cf74f71bbd12 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 5 Feb 2026 16:03:33 +0700 Subject: [PATCH 1/7] feat: add middleware support for Flight calls --- influxdb_client_3/query/query_api.py | 15 +++++++++- tests/test_query.py | 41 ++++++++++++++++++++++++++-- tests/util/mocks.py | 25 ++++++++++++++++- 3 files changed, 77 insertions(+), 4 deletions(-) diff --git a/influxdb_client_3/query/query_api.py b/influxdb_client_3/query/query_api.py index 0ba92de..7631795 100644 --- a/influxdb_client_3/query/query_api.py +++ b/influxdb_client_3/query/query_api.py @@ -20,6 +20,7 @@ class QueryApiOptions(object): flight_client_options (dict): base set of flight client options passed to internal pyarrow.flight.FlightClient timeout(float): timeout in seconds to wait for a response disable_grpc_compression (bool): disable gRPC compression for query responses + middleware (list): list of middleware functions to be applied to Flight calls """ _DEFAULT_TIMEOUT = 300.0 tls_root_certs: bytes = None @@ -28,13 +29,15 @@ class QueryApiOptions(object): flight_client_options: dict = None timeout: float = None disable_grpc_compression: bool = False + middleware: list = None def __init__(self, root_certs_path: str, verify: bool, proxy: str, flight_client_options: dict, timeout: float = _DEFAULT_TIMEOUT, - disable_grpc_compression: bool = False): + disable_grpc_compression: bool = False, + middleware: list = None): """ Initialize a set of QueryApiOptions @@ -45,6 +48,7 @@ def __init__(self, root_certs_path: str, to be passed to internal pyarrow.flight.FlightClient. :param timeout: timeout in seconds to wait for a response. :param disable_grpc_compression: disable gRPC compression for query responses. + :param middleware: list of middleware functions to be applied to Flight calls. """ if root_certs_path: self.tls_root_certs = self._read_certs(root_certs_path) @@ -53,6 +57,7 @@ def __init__(self, root_certs_path: str, self.flight_client_options = flight_client_options self.timeout = timeout self.disable_grpc_compression = disable_grpc_compression + self.middleware = middleware def _read_certs(self, path: str) -> bytes: with open(path, "rb") as certs_file: @@ -81,6 +86,7 @@ class QueryApiOptionsBuilder(object): _flight_client_options: dict = None _timeout: float = None _disable_grpc_compression: bool = False + _middleware: list = None def root_certs(self, path: str): self._root_certs_path = path @@ -107,6 +113,10 @@ def disable_grpc_compression(self, disable: bool): self._disable_grpc_compression = disable return self + def middleware(self, middleware: list): + self._middleware = middleware + return self + def build(self) -> QueryApiOptions: """Build a QueryApiOptions object with previously set values""" return QueryApiOptions( @@ -116,6 +126,7 @@ def build(self) -> QueryApiOptions: flight_client_options=self._flight_client_options, timeout=self._timeout, disable_grpc_compression=self._disable_grpc_compression, + middleware=self._middleware ) @@ -181,6 +192,8 @@ def __init__(self, self._flight_client_options["generic_options"].append( ("grpc.compression_enabled_algorithms_bitset", 1) ) + if options.middleware: + self._flight_client_options["middleware"] = options.middleware if self._proxy: self._flight_client_options["generic_options"].append(("grpc.http_proxy", self._proxy)) self._flight_client = FlightClient(connection_string, **self._flight_client_options) diff --git a/tests/test_query.py b/tests/test_query.py index ade4b2d..adca62c 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -12,7 +12,7 @@ Ticket ) -from influxdb_client_3 import InfluxDBClient3 +from influxdb_client_3 import InfluxDBClient3, flight_client_options from influxdb_client_3.query.query_api import QueryApiOptionsBuilder, QueryApi from influxdb_client_3.version import USER_AGENT from tests.util import asyncio_run @@ -25,7 +25,8 @@ HeaderCheckServerMiddlewareFactory, NoopAuthHandler, get_req_headers, - set_req_headers + set_req_headers, ModifyAuthHeaderClientMiddlewareFactory, + HeaderCheckServerMiddlewareFactory1 ) @@ -311,6 +312,42 @@ def test_prepare_query(self): assert _req_headers['authorization'] == [f"Bearer {token}"] set_req_headers({}) + def test_query_with_middleware_success(self): + with HeaderCheckFlightServer( + auth_handler=NoopAuthHandler(), + middleware={"check": HeaderCheckServerMiddlewareFactory1()}) as server: + + middleware = [ModifyAuthHeaderClientMiddlewareFactory()] + client = InfluxDBClient3( + host=f'http://localhost:{server.port}', + org='test_org', + databse='test_db', + token='TEST_TOKEN', + flight_client_options=flight_client_options(middleware=middleware) + + ) + + df = client.query(query='SELECT * FROM test', mode="pandas") + self.assertIsNotNone(df) + + def test_query_with_missing_middleware(self): + with HeaderCheckFlightServer( + auth_handler=NoopAuthHandler(), + middleware={"check": HeaderCheckServerMiddlewareFactory1()}) as server: + + client = InfluxDBClient3( + host=f'http://localhost:{server.port}', + org='test_org', + databse='test_db', + token='TEST_TOKEN' + ) + + try: + df = client.query(query='SELECT * FROM test', mode="pandas") + self.fail("Should have failed due to missing middleware") + except Exception as e: + assert "Invalid header value from middleware" in str(e) + @asyncio_run async def test_query_async_pandas(self): with ConstantFlightServer() as server: diff --git a/tests/util/mocks.py b/tests/util/mocks.py index 5d7201e..08329b5 100644 --- a/tests/util/mocks.py +++ b/tests/util/mocks.py @@ -5,8 +5,9 @@ from pyarrow import ( array, Table, - concat_tables, ArrowException + concat_tables, ArrowException, flight ) +from pyarrow._flight import FlightInternalError from pyarrow.flight import ( FlightServerBase, RecordBatchStream, @@ -158,6 +159,28 @@ def number_batches(table): buf = struct.pack(' Date: Thu, 5 Feb 2026 16:13:18 +0700 Subject: [PATCH 2/7] test: add middleware handling tests in QueryApi --- tests/test_query.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/test_query.py b/tests/test_query.py index adca62c..61a08b7 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -176,11 +176,13 @@ def test_query_client_with_options(self): cert_chain = 'mTLS_explicit_chain' self.create_cert_file(cert_file) test_flight_client_options = {'private_key': private_key, 'cert_chain': cert_chain} + middleware = [ModifyAuthHeaderClientMiddlewareFactory()] options = QueryApiOptionsBuilder()\ .proxy(proxy_name) \ .root_certs(cert_file) \ .tls_verify(False) \ .flight_client_options(test_flight_client_options) \ + .middleware(middleware) \ .build() client = QueryApi(connection, @@ -196,6 +198,7 @@ def test_query_client_with_options(self): assert client._flight_client_options['private_key'] == private_key assert client._flight_client_options['cert_chain'] == cert_chain assert client._proxy == proxy_name + assert client._flight_client_options['middleware'] == middleware fc_opts = client._flight_client_options assert dict(fc_opts['generic_options'])['grpc.secondary_user_agent'].startswith('influxdb3-python/') assert dict(fc_opts['generic_options'])['grpc.http_proxy'] == proxy_name @@ -343,7 +346,7 @@ def test_query_with_missing_middleware(self): ) try: - df = client.query(query='SELECT * FROM test', mode="pandas") + client.query(query='SELECT * FROM test', mode="pandas") self.fail("Should have failed due to missing middleware") except Exception as e: assert "Invalid header value from middleware" in str(e) From 8dbfe145d5848b3a87b8efa5b2ce972961b9760f Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 5 Feb 2026 16:16:13 +0700 Subject: [PATCH 3/7] test: add missing blank lines between class declarations in mocks.py --- tests/util/mocks.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/util/mocks.py b/tests/util/mocks.py index 08329b5..99eac2e 100644 --- a/tests/util/mocks.py +++ b/tests/util/mocks.py @@ -159,6 +159,7 @@ def number_batches(table): buf = struct.pack(' Date: Thu, 5 Feb 2026 16:20:55 +0700 Subject: [PATCH 4/7] test: fix indentation in `test_query_with_missing_middleware` --- tests/test_query.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_query.py b/tests/test_query.py index 61a08b7..1e531c7 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -335,8 +335,8 @@ def test_query_with_middleware_success(self): def test_query_with_missing_middleware(self): with HeaderCheckFlightServer( - auth_handler=NoopAuthHandler(), - middleware={"check": HeaderCheckServerMiddlewareFactory1()}) as server: + auth_handler=NoopAuthHandler(), + middleware={"check": HeaderCheckServerMiddlewareFactory1()}) as server: client = InfluxDBClient3( host=f'http://localhost:{server.port}', From a515ee68943249e50274339000a4d1968e317fa2 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 5 Feb 2026 16:39:23 +0700 Subject: [PATCH 5/7] test: rename `ModifyAuthHeaderClientMiddleware` to `ModifyHeaderClientMiddleware` and add example usage --- Examples/query_with_middleware.py | 33 +++++++++++++++++++++++++++++++ tests/test_query.py | 7 +++---- tests/util/mocks.py | 6 +++--- 3 files changed, 39 insertions(+), 7 deletions(-) create mode 100644 Examples/query_with_middleware.py diff --git a/Examples/query_with_middleware.py b/Examples/query_with_middleware.py new file mode 100644 index 0000000..e9f3d7f --- /dev/null +++ b/Examples/query_with_middleware.py @@ -0,0 +1,33 @@ +from pyarrow import flight + +from config import Config +from influxdb_client_3 import InfluxDBClient3, flight_client_options + + +# This middleware will add an additional attribute `some-attribute` to the header +class ModifyHeaderClientMiddleware(flight.ClientMiddleware): + def sending_headers(self): + return { + "some-attribute": "some-value", + } + + def received_headers(self, headers): + pass + + +class ModifyHeaderClientMiddlewareFactory(flight.ClientMiddlewareFactory): + def start_call(self, info): + return ModifyHeaderClientMiddleware() + + +config = Config() +middleware = [ModifyHeaderClientMiddlewareFactory()] +client = InfluxDBClient3( + host=config.host, + token=config.token, + database=config.database, + flight_client_options=flight_client_options(middleware=middleware) +) + +df = client.query(query="select * from cpu11 limit 10", mode="pandas") +print(len(df)) diff --git a/tests/test_query.py b/tests/test_query.py index 1e531c7..f702f17 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -25,7 +25,7 @@ HeaderCheckServerMiddlewareFactory, NoopAuthHandler, get_req_headers, - set_req_headers, ModifyAuthHeaderClientMiddlewareFactory, + set_req_headers, ModifyHeaderClientMiddlewareFactory, HeaderCheckServerMiddlewareFactory1 ) @@ -176,7 +176,7 @@ def test_query_client_with_options(self): cert_chain = 'mTLS_explicit_chain' self.create_cert_file(cert_file) test_flight_client_options = {'private_key': private_key, 'cert_chain': cert_chain} - middleware = [ModifyAuthHeaderClientMiddlewareFactory()] + middleware = [ModifyHeaderClientMiddlewareFactory()] options = QueryApiOptionsBuilder()\ .proxy(proxy_name) \ .root_certs(cert_file) \ @@ -320,14 +320,13 @@ def test_query_with_middleware_success(self): auth_handler=NoopAuthHandler(), middleware={"check": HeaderCheckServerMiddlewareFactory1()}) as server: - middleware = [ModifyAuthHeaderClientMiddlewareFactory()] + middleware = [ModifyHeaderClientMiddlewareFactory()] client = InfluxDBClient3( host=f'http://localhost:{server.port}', org='test_org', databse='test_db', token='TEST_TOKEN', flight_client_options=flight_client_options(middleware=middleware) - ) df = client.query(query='SELECT * FROM test', mode="pandas") diff --git a/tests/util/mocks.py b/tests/util/mocks.py index 99eac2e..8716e19 100644 --- a/tests/util/mocks.py +++ b/tests/util/mocks.py @@ -160,7 +160,7 @@ def number_batches(table): yield batch, buf -class ModifyAuthHeaderClientMiddleware(flight.ClientMiddleware): +class ModifyHeaderClientMiddleware(flight.ClientMiddleware): def sending_headers(self): return { "header-from-middleware": "some-value", @@ -170,9 +170,9 @@ def received_headers(self, headers): pass -class ModifyAuthHeaderClientMiddlewareFactory(flight.ClientMiddlewareFactory): +class ModifyHeaderClientMiddlewareFactory(flight.ClientMiddlewareFactory): def start_call(self, info): - return ModifyAuthHeaderClientMiddleware() + return ModifyHeaderClientMiddleware() class HeaderCheckServerMiddlewareFactory1(ServerMiddlewareFactory): From 4fd3910a042f5d1e60504931d1ae909dbaae1cdc Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 5 Feb 2026 16:45:39 +0700 Subject: [PATCH 6/7] chore: update CHANGELOG with middleware support for Flight client --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c021310..d8380d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## 0.18.0 [unreleased] +### Features + +1. [#196](https://github.com/InfluxCommunity/influxdb3-python/pull/196): Support passing middleware functions to the Flight client. + ### Bug Fixes 1. [#194](https://github.com/InfluxCommunity/influxdb3-python/pull/194): Fix `InfluxDBClient3.write_file()` and `InfluxDBClient3.write_dataframe()` fail with batching mode. From b0c2f3e8e5fa11eae3613f8b1a6b27d26f1ad65f Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 5 Feb 2026 16:51:25 +0700 Subject: [PATCH 7/7] [EMPTY] trigger CI