Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions acouchbase_analytics/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ def start_query(self, statement: str, *args: object, **kwargs: object) -> Awaita
async def set_credential(self, credential: Credential) -> None:
"""Replace the credential used for subsequent HTTP requests.

Allows updating credentials (in particular, rotating a JWT) without restarting
the application. The new credential must be of the same type as the current
credential.
Use this to rotate a JWT or client certificate without restarting the
application. The new credential must be of the same type as the
current one.

Args:
credential: The new :class:`.Credential` to use.
Expand Down
59 changes: 40 additions & 19 deletions acouchbase_analytics/protocol/_core/client_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,28 +140,26 @@ async def close_client(self) -> None:
await self._client.aclose()
self.log_message('Cluster HTTP client closed', LogLevel.INFO)

def _build_client(self) -> AsyncClient:
auth = DynamicCredentialAuth(self._credential_holder)
if self._conn_details.is_secure():
if self._conn_details.ssl_context is None:
raise ValueError('SSL context is required for secure connections.')
transport = None
if self._http_transport_cls is not None:
transport = self._http_transport_cls(verify=self._conn_details.ssl_context)
return AsyncClient(verify=self._conn_details.ssl_context, auth=auth, transport=transport)
transport = None
if self._http_transport_cls is not None:
transport = self._http_transport_cls()
return AsyncClient(auth=auth, transport=transport)

async def create_client(self) -> None:
"""
**INTERNAL**
"""
if not hasattr(self, '_client'):
auth = DynamicCredentialAuth(self._credential_holder)
if self._conn_details.is_secure():
if self._conn_details.ssl_context is None:
raise ValueError('SSL context is required for secure connections.')
transport = None
if self._http_transport_cls is not None:
transport = self._http_transport_cls(verify=self._conn_details.ssl_context)
self._client = AsyncClient(
verify=self._conn_details.ssl_context,
auth=auth,
transport=transport,
)
else:
transport = None
if self._http_transport_cls is not None:
transport = self._http_transport_cls()
self._client = AsyncClient(auth=auth, transport=transport)
self._client = self._build_client()
self.log_message(
(f'Cluster HTTP client created: connection_details={self._conn_details.get_init_details()}'),
LogLevel.INFO,
Expand Down Expand Up @@ -206,8 +204,31 @@ def reset_client(self) -> None:
del self._client

async def update_credential(self, new_credential: Credential) -> None:
self._credential_holder.replace(new_credential)
# Future mTLS: await close_client(), rebuild SSL context, await create_client().
if new_credential._kind == 'cert':
# httpx pins the SSL context to the AsyncClient at construction,
# and the cert chain is part of that context. So a cert rotation
# needs a fresh Client. Build it before aclosing the old one,
# otherwise a concurrent send_request can see self._client gone.
self._credential_holder.credential._check_replaceable_with(new_credential)
old_client = getattr(self, '_client', None)
old_ssl_context = self._conn_details.ssl_context
old_sni_hostname = self._conn_details.sni_hostname
try:
self._conn_details.validate_security_options(new_credential)
# If the cluster hasn't issued a request yet there's no Client
# to swap; we still refreshed the SSL context above.
new_client = self._build_client() if old_client is not None else None
except Exception:
self._conn_details.ssl_context = old_ssl_context
self._conn_details.sni_hostname = old_sni_hostname
raise
if new_client is not None:
self._client = new_client
self._credential_holder.replace(new_credential)
if old_client is not None:
await old_client.aclose()
else:
self._credential_holder.replace(new_credential)
self.log_message('Cluster HTTP credential updated', LogLevel.INFO)


Expand Down
Loading
Loading