Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ on:

env:
CBCI_PROJECT_TYPE: "ANALYTICS"
CBCI_DEFAULT_PYTHON: "3.9"
CBCI_SUPPORTED_PYTHON_VERSIONS: "3.9 3.10 3.11 3.12 3.13"
CBCI_DEFAULT_PYTHON: "3.10"
CBCI_SUPPORTED_PYTHON_VERSIONS: "3.10 3.11 3.12 3.13 3.14"
CBCI_SUPPORTED_X86_64_PLATFORMS: "linux alpine macos windows"
CBCI_SUPPORTED_ARM64_PLATFORMS: "linux macos"
CBCI_DEFAULT_LINUX_X86_64_PLATFORM: "ubuntu-22.04"
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ on:

env:
CBCI_PROJECT_TYPE: "ANALYTICS"
CBCI_DEFAULT_PYTHON: "3.9"
CBCI_SUPPORTED_PYTHON_VERSIONS: "3.9 3.10 3.11 3.12 3.13"
CBCI_DEFAULT_PYTHON: "3.10"
CBCI_SUPPORTED_PYTHON_VERSIONS: "3.10 3.11 3.12 3.13 3.14"
CBCI_SUPPORTED_X86_64_PLATFORMS: "linux alpine macos windows"
CBCI_SUPPORTED_ARM64_PLATFORMS: "linux macos"
CBCI_DEFAULT_LINUX_X86_64_PLATFORM: "ubuntu-22.04"
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/verify_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ on:

env:
CBCI_PROJECT_TYPE: "ANALYTICS"
CBCI_DEFAULT_PYTHON: "3.9"
CBCI_SUPPORTED_PYTHON_VERSIONS: "3.9 3.10 3.11 3.12 3.13"
CBCI_DEFAULT_PYTHON: "3.10"
CBCI_SUPPORTED_PYTHON_VERSIONS: "3.10 3.11 3.12 3.13 3.14"
CBCI_SUPPORTED_X86_64_PLATFORMS: "linux alpine macos windows"
CBCI_SUPPORTED_ARM64_PLATFORMS: "linux macos"
CBCI_DEFAULT_LINUX_X86_64_PLATFORM: "ubuntu-22.04"
Expand Down
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ repos:
# Compile requirements
- id: pip-compile
name: pip-compile requirements.in
args: [requirements.in, --python-version, '3.9', --universal, -o, requirements.txt]
args: [requirements.in, --python-version, '3.10', --universal, -o, requirements.txt]
- id: pip-compile
name: pip-compile requirements-dev.in
args: [requirements-dev.in, --python-version, '3.9', --universal, -o, requirements-dev.txt]
args: [requirements-dev.in, --python-version, '3.10', --universal, -o, requirements-dev.txt]
files: ^requirements-dev\.(in|txt)$
- id: pip-compile
name: pip-compile requirements-sphinx.in
args: [requirements-sphinx.in, --python-version, '3.9', --universal, -o, requirements-sphinx.txt]
args: [requirements-sphinx.in, --python-version, '3.10', --universal, -o, requirements-sphinx.txt]
files: ^requirements-sphinx\.(in|txt)$
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Couchbase Python Analytics Client
Python client for [Couchbase](https://couchbase.com) Analytics.

Currently Python 3.9 - Python 3.13 is supported.
Currently Python 3.10 - Python 3.14 is supported.

The Analytics SDK supports static typing. Currently only [mypy](https://github.com/python/mypy) is supported. You mileage may vary (YMMV) with the use of other static type checkers (e.g. [pyright](https://github.com/microsoft/pyright)).

Expand Down
22 changes: 9 additions & 13 deletions acouchbase_analytics/protocol/_core/client_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from couchbase_analytics.common.deserializer import Deserializer
from couchbase_analytics.common.logging import LogLevel, log_message
from couchbase_analytics.protocol._core.auth import DynamicCredentialAuth
from couchbase_analytics.protocol._core.request import CancelRequest, HttpRequest, QueryRequest, StartQueryRequest
from couchbase_analytics.protocol._core.request import HttpRequest
from couchbase_analytics.protocol.connection import _ConnectionDetails
from couchbase_analytics.protocol.options import OptionsBuilder

Expand Down Expand Up @@ -179,19 +179,15 @@ async def send_request(self, request: HttpRequest, stream: Optional[bool] = True
if not hasattr(self, '_client'):
raise RuntimeError('Client not created yet')

url = URL(
scheme=request.url.scheme,
host=request.url.ip,
port=request.url.port,
path=request.url.path,
url = URL(scheme=request.url.scheme, host=request.url.ip, port=request.url.port, path=request.url.path)
req = self._client.build_request(
request.method,
url,
data=request.data,
json=request.body,
headers=request.headers,
extensions=request.extensions,
)
if isinstance(request, (QueryRequest, StartQueryRequest)):
req = self._client.build_request(request.method, url, json=request.body, extensions=request.extensions)
else:
data = request.data if isinstance(request, CancelRequest) else None
req = self._client.build_request(
request.method, url, data=data, headers=request.headers, extensions=request.extensions
)

if stream is None:
stream = True
Expand Down
28 changes: 20 additions & 8 deletions acouchbase_analytics/protocol/_core/request_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,22 @@ def timed_out(self) -> bool:
def calculate_backoff(self) -> float:
return self._backoff_calc.calculate_backoff(self._error_context.num_attempts) / 1000

async def check_for_http_status_error(
self,
status_code: int,
ignore_not_found_status: Optional[bool] = False,
close_handler: Optional[Callable[[], Awaitable[None]]] = None,
) -> None:
ctx = str(self._error_context)
err = ErrorMapper.maybe_get_error_from_status_code(
status_code, ctx, ignore_not_found_status=ignore_not_found_status
)
if err is None:
return
if close_handler is not None:
await close_handler()
raise err

def create_response_task(self, fn: Callable[..., Coroutine[Any, Any, Any]], *args: object) -> Task[Any]:
if self._backend is None or self._backend.backend_lib != 'asyncio':
raise RuntimeError('Must use the asyncio backend to create a response task.')
Expand Down Expand Up @@ -188,10 +204,13 @@ async def process_response(
core_response: HttpCoreResponse,
close_handler: Callable[[], Awaitable[None]],
handle_context_shutdown: Optional[bool] = False,
ignore_not_found_status: Optional[bool] = False,
) -> Any:
# we have all the data, close the core response/stream
await close_handler()

await self.check_for_http_status_error(
core_response.status_code, ignore_not_found_status=ignore_not_found_status
)
try:
json_response = core_response.json()
except json.JSONDecodeError:
Expand Down Expand Up @@ -243,7 +262,6 @@ async def send_request(
'request_deadline': f'{self._request_deadline}',
}
self.log_message('HTTP response', LogLevel.DEBUG, message_data=message_data)
Comment thread
thejcfactor marked this conversation as resolved.
self._check_for_http_status_error(response.status_code, ignore_not_found_status=ignore_not_found_status)
return response

async def shutdown(
Expand All @@ -265,12 +283,6 @@ async def shutdown(
self._shutdown = True
self.log_message('Request context shutdown complete', LogLevel.INFO)

def _check_for_http_status_error(self, status_code: int, ignore_not_found_status: Optional[bool] = False) -> None:
ctx = str(self._error_context)
ErrorMapper.maybe_raise_error_from_status_code(
status_code, ctx, ignore_not_found_status=ignore_not_found_status
)

def _check_timed_out(self) -> None:
if self._request_state in (RequestState.Timeout, RequestState.Error):
return
Expand Down
6 changes: 5 additions & 1 deletion acouchbase_analytics/protocol/_core/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ async def _process_no_body_response(self) -> None:
if 200 <= status_code < 300 or status_code == 404:
await self._request_context.shutdown()
return
await self._request_context.check_for_http_status_error(status_code, ignore_not_found_status=True)
ctx = str(self._request_context.error_context)
raise WrappedError(AnalyticsError(context=ctx, message=f'Request failed with status {status_code}.'))

Expand All @@ -131,6 +132,9 @@ async def _process_response(self) -> None:
**INTERNAL**
"""
self._json_response = await self._request_context.process_response(
self._core_response, self.close, handle_context_shutdown=True
self._core_response,
self.close,
handle_context_shutdown=True,
ignore_not_found_status=self._has_no_body_response,
)
await self.set_metadata(json_data=self._json_response)
6 changes: 6 additions & 0 deletions acouchbase_analytics/protocol/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@ async def send_request(self) -> None:
self._request_context.start_stream(self._core_response)
# block until we either know we have rows or we have an error
await self._request_context.wait_for_results_or_errors()

# it is possible we have "rows" but the HTTP status code is indicative of an error (e.g. 404, 503, etc.),
# so we need to check for HTTP status errors before allowing iteration to continue
await self._request_context.check_for_http_status_error(
self._core_response.status_code, close_handler=self.close
)
if not self._request_context.okay_to_iterate:
await self._request_context.finish_processing_stream()
await self._process_response()
Expand Down
17 changes: 9 additions & 8 deletions couchbase_analytics/protocol/_core/client_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from couchbase_analytics.common.deserializer import Deserializer
from couchbase_analytics.common.logging import LogLevel, log_message
from couchbase_analytics.protocol._core.auth import DynamicCredentialAuth
from couchbase_analytics.protocol._core.request import CancelRequest, HttpRequest, QueryRequest, StartQueryRequest
from couchbase_analytics.protocol._core.request import HttpRequest
from couchbase_analytics.protocol.connection import _ConnectionDetails
from couchbase_analytics.protocol.options import OptionsBuilder

Expand Down Expand Up @@ -177,13 +177,14 @@ def send_request(self, request: HttpRequest, stream: Optional[bool] = True) -> R
raise RuntimeError('Client not created yet')

url = URL(scheme=request.url.scheme, host=request.url.ip, port=request.url.port, path=request.url.path)
if isinstance(request, (QueryRequest, StartQueryRequest)):
req = self._client.build_request(request.method, url, json=request.body, extensions=request.extensions)
else:
data = request.data if isinstance(request, CancelRequest) else None
req = self._client.build_request(
request.method, url, data=data, headers=request.headers, extensions=request.extensions
)
req = self._client.build_request(
request.method,
url,
data=request.data,
json=request.body,
headers=request.headers,
extensions=request.extensions,
)

if stream is None:
stream = True
Expand Down
87 changes: 46 additions & 41 deletions couchbase_analytics/protocol/_core/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,17 @@ class RequestExtensions(TypedDict, total=False):
trace: Optional[Callable[[str, str], Union[None, Coroutine[Any, Any, None]]]]


@dataclass
@dataclass(kw_only=True)
class HttpRequest:
url: RequestURL
extensions: RequestExtensions
path: str
method: str
headers: Mapping[str, str]
max_retries: int
data: Optional[Dict[str, Union[str, object]]] = None
body: Optional[Dict[str, Union[str, object]]] = None
should_stream: Optional[bool] = False
Comment thread
thejcfactor marked this conversation as resolved.
Comment thread
thejcfactor marked this conversation as resolved.

def add_trace_to_extensions(
self, handler: Callable[[str, str], Union[None, Coroutine[Any, Any, None]]]
Expand Down Expand Up @@ -96,22 +99,18 @@ def update_url(self, ip: str, path: str) -> HttpRequest:
return self


class CancelRequestData(TypedDict):
request_id: str


@dataclass
@dataclass(kw_only=True)
class CancelRequest(HttpRequest):
data: CancelRequestData
pass
Comment thread
thejcfactor marked this conversation as resolved.


@dataclass
@dataclass(kw_only=True)
class FetchResultsRequest(HttpRequest):
deserializer: Deserializer
should_stream: bool = True

Comment thread
thejcfactor marked this conversation as resolved.

@dataclass
@dataclass(kw_only=True)
class QueryRequest(HttpRequest):
deserializer: Deserializer
body: Dict[str, Union[str, object]]
Expand All @@ -128,11 +127,10 @@ def get_request_statement(self) -> Optional[str]:
return None


@dataclass
@dataclass(kw_only=True)
class StartQueryRequest(HttpRequest):
body: Dict[str, Union[str, object]]
options: Optional[StartQueryOptionsTransformedKwargs] = None
should_stream: bool = False

def get_request_statement(self) -> Optional[str]:
"""
Expand Down Expand Up @@ -171,20 +169,27 @@ def build_request_from_handle(self, handle: str, method: Optional[str] = None) -
max_retries = self._conn_details.get_max_retries()
parsed = urlparse(handle)
path = parsed.path if parsed.scheme else handle
return HttpRequest(self._conn_details.url, extensions, path, method=method, headers={}, max_retries=max_retries)
return HttpRequest(
url=self._conn_details.url,
extensions=extensions,
path=path,
method=method,
headers={},
max_retries=max_retries,
)

def build_cancel_request(self, request_id: str) -> CancelRequest:
extensions = deepcopy(self._extensions)
extensions['timeout']['read'] = self._handle_request_timeout
max_retries = self._conn_details.get_max_retries()
return CancelRequest(
self._conn_details.url,
extensions,
'/api/v1/active_requests',
'DELETE',
{'Content-Type': 'application/x-www-form-urlencoded'},
max_retries,
{'request_id': request_id},
url=self._conn_details.url,
extensions=extensions,
path='/api/v1/active_requests',
method='DELETE',
headers={'Content-Type': 'application/x-www-form-urlencoded'},
max_retries=max_retries,
data={'request_id': request_id},
)

def build_discard_results_request(self, handle: str) -> HttpRequest:
Expand All @@ -198,13 +203,13 @@ def build_fetch_results_request(
deserializer = q_opts.pop('deserializer', None) or self._conn_details.default_deserializer
max_retries = self._conn_details.get_max_retries()
return FetchResultsRequest(
base_request.url,
base_request.extensions,
base_request.path,
base_request.method,
{},
max_retries,
deserializer,
url=base_request.url,
extensions=base_request.extensions,
path=base_request.path,
method=base_request.method,
headers={},
max_retries=max_retries,
deserializer=deserializer,
)

def build_query_request(
Expand Down Expand Up @@ -241,14 +246,14 @@ def build_query_request(
max_retries = retries if retries is not None else self._conn_details.get_max_retries()

return QueryRequest(
self._conn_details.url,
extensions,
'',
'POST',
{},
max_retries,
deserializer,
body,
url=self._conn_details.url,
extensions=extensions,
path='',
method='POST',
headers={},
max_retries=max_retries,
deserializer=deserializer,
body=body,
options=q_opts,
enable_cancel=enable_cancel,
)
Expand Down Expand Up @@ -279,13 +284,13 @@ def build_start_query_request( # noqa: C901
max_retries = retries if retries is not None else self._conn_details.get_max_retries()

return StartQueryRequest(
self._conn_details.url,
extensions,
'',
'POST',
{},
max_retries,
body,
url=self._conn_details.url,
extensions=extensions,
path='',
method='POST',
headers={},
max_retries=max_retries,
body=body,
options=q_opts,
)

Expand Down
Loading
Loading