Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/bruin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from bruin._connection import get_connection
from bruin._context import context
from bruin._query import query
from bruin._sheets import read_sheet, write_sheet

__version__ = "0.3.1"
__all__ = ["__version__", "context", "get_connection", "query"]
__all__ = ["__version__", "context", "get_connection", "query", "read_sheet", "write_sheet"]
142 changes: 132 additions & 10 deletions src/bruin/_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def __init__(self, name: str, raw: dict):
super().__init__(name, "google_cloud_platform", raw)
self._credentials = None
self._bigquery_client = None
self._sheets_client = None

def _uses_adc(self) -> bool:
"""Return True when the connection is configured for Application Default Credentials."""
Expand Down Expand Up @@ -145,15 +146,32 @@ def bigquery(self):
return self._bigquery_client

def sheets(self):
"""Return an authorized pygsheets client."""
try:
import pygsheets
except ImportError:
raise ImportError(
"Install bruin-sdk[sheets] to use Google Sheets connections: "
"pip install 'bruin-sdk[sheets]'"
"""Return a cached, authorized pygsheets client with Sheets + Drive scopes."""
if self._sheets_client is None:
try:
import pygsheets
except ImportError:
raise ImportError(
"Install bruin-sdk[sheets] to use Google Sheets connections: "
"pip install 'bruin-sdk[sheets]'"
)
_SHEETS_SCOPES = (
"https://www.googleapis.com/auth/spreadsheets",
"https://www.googleapis.com/auth/drive",
)
return pygsheets.authorize(custom_credentials=self.credentials)
scoped = self.credentials.with_scopes(_SHEETS_SCOPES)
self._sheets_client = pygsheets.authorize(custom_credentials=scoped)
return self._sheets_client

def read_sheet(self, spreadsheet, worksheet="Sheet1"):
"""Read a Google Sheets worksheet into a pandas DataFrame."""
from bruin._sheets import _read_sheet_impl
return _read_sheet_impl(self, spreadsheet, worksheet)

def write_sheet(self, df, spreadsheet, worksheet="Sheet1", fit=True):
"""Write a pandas DataFrame to a Google Sheets worksheet."""
from bruin._sheets import _write_sheet_impl
_write_sheet_impl(self, df, spreadsheet, worksheet, fit)

def storage(self):
"""Return a google.cloud.storage.Client."""
Expand All @@ -175,15 +193,116 @@ def client(self):
return self.bigquery()

def close(self):
"""Close the BigQuery client if it was initialized."""
"""Close the BigQuery and Sheets clients if initialized."""
if self._bigquery_client is not None:
logger.debug("Closing BigQuery client for connection '%s'", self.name)
close = getattr(self._bigquery_client, "close", None)
if callable(close):
close()
self._bigquery_client = None
self._sheets_client = None
self._credentials = None


class GoogleSheetsConnection(Connection):
"""Standalone Google Sheets connection (mirrors Go CLI's ``google_sheets`` type)."""

def __init__(self, name: str, raw: dict):
super().__init__(name, "google_sheets", raw)
self._credentials = None
self._sheets_client = None

def _parse_sa_info(self):
"""Parse the service account JSON from the connection payload."""
sa_json = self.raw.get("service_account_json", "")
if not sa_json:
sa_file = self.raw.get("service_account_file", "")
if sa_file:
return sa_file # sentinel — handled in credentials property
return None
try:
return json.loads(sa_json)
except (json.JSONDecodeError, TypeError) as exc:
raise ConnectionParseError(
f"Failed to parse service_account_json for '{self.name}': {exc}"
) from exc

@property
def credentials(self):
"""Return scoped google credentials for Sheets + Drive."""
if self._credentials is not None:
return self._credentials

_SCOPES = [
"https://www.googleapis.com/auth/spreadsheets",
"https://www.googleapis.com/auth/drive",
]

sa_info = self._parse_sa_info()

if sa_info is None:
raise ConnectionParseError(
f"Google Sheets connection '{self.name}' has no "
f"'service_account_json' or 'service_account_file'."
)

try:
from google.oauth2 import service_account
except ImportError:
raise ImportError(
"Install bruin-sdk[sheets] to use Google Sheets connections: "
"pip install 'bruin-sdk[sheets]'"
)

if isinstance(sa_info, str):
# sa_info is a file path (from service_account_file)
self._credentials = service_account.Credentials.from_service_account_file(
sa_info, scopes=_SCOPES,
)
else:
self._credentials = service_account.Credentials.from_service_account_info(
sa_info, scopes=_SCOPES,
)

logger.debug("Using service account credentials for '%s'", self.name)
return self._credentials

def sheets(self):
"""Return a cached, authorized pygsheets client."""
if self._sheets_client is None:
try:
import pygsheets
except ImportError:
raise ImportError(
"Install bruin-sdk[sheets] to use Google Sheets connections: "
"pip install 'bruin-sdk[sheets]'"
)
self._sheets_client = pygsheets.authorize(custom_credentials=self.credentials)
return self._sheets_client

def read_sheet(self, spreadsheet, worksheet="Sheet1"):
"""Read a Google Sheets worksheet into a pandas DataFrame."""
from bruin._sheets import _read_sheet_impl
return _read_sheet_impl(self, spreadsheet, worksheet)

def write_sheet(self, df, spreadsheet, worksheet="Sheet1", fit=True):
"""Write a pandas DataFrame to a Google Sheets worksheet."""
from bruin._sheets import _write_sheet_impl
_write_sheet_impl(self, df, spreadsheet, worksheet, fit)

@property
def client(self):
"""Alias for sheets() — the primary use case for this connection type."""
return self.sheets()

def close(self):
"""Clear cached client and credentials."""
self._sheets_client = None
self._credentials = None

def __repr__(self):
return f"GoogleSheetsConnection(name={self.name!r})"


def _create_client(conn_type: str, raw):
"""Create a database client based on connection type."""
Expand Down Expand Up @@ -212,7 +331,7 @@ def _create_client(conn_type: str, raw):
if factory is None:
raise ConnectionTypeError(
f"Unsupported connection type '{conn_type}'. "
f"Supported types: google_cloud_platform, {', '.join(sorted(factories))}."
f"Supported types: google_cloud_platform, google_sheets, {', '.join(sorted(factories))}."
)
return factory(raw)

Expand Down Expand Up @@ -584,4 +703,7 @@ def get_connection(name: str) -> "Connection | GCPConnection":
if conn_type == "google_cloud_platform":
return GCPConnection(name, raw)

if conn_type == "google_sheets":
return GoogleSheetsConnection(name, raw)

return Connection(name, conn_type, raw)
129 changes: 129 additions & 0 deletions src/bruin/_sheets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import logging

from bruin._connection import get_connection
from bruin.exceptions import ConnectionNotFoundError, ConnectionTypeError

logger = logging.getLogger("bruin")


def _get_sheets_client(connection):
"""Resolve *connection* to a ``(Connection, pygsheets.Client)`` pair.

Works with both ``google_cloud_platform`` and ``google_sheets`` connection types.
"""
if connection is None:
from bruin._context import context

connection = context.connection
if connection is None:
raise ConnectionNotFoundError(
"No connection specified and no default connection set "
"(BRUIN_CONNECTION env var is missing). "
"Pass a connection name explicitly: read_sheet(spreadsheet, connection='my_gcp')"
)

conn = get_connection(connection)

if not hasattr(conn, "sheets"):
raise ConnectionTypeError(
f"Connection '{conn.name}' ({conn.type}) does not support Google Sheets."
)
return conn, conn.sheets()


def _read_sheet_impl(conn, spreadsheet, worksheet="Sheet1"):
"""Core read logic that operates on an already-resolved connection object."""
gc = conn.sheets()
logger.debug("Reading '%s'.'%s' via '%s'", spreadsheet, worksheet, conn.name)

sh = gc.open_by_key(spreadsheet)
wks = sh.worksheet_by_title(worksheet)
df = wks.get_as_df(
numerize=True,
empty_value="",
include_tailing_empty=True,
include_tailing_empty_rows=False,
)

logger.debug("Read %d rows x %d cols", len(df), len(df.columns))
return df


def _write_sheet_impl(conn, df, spreadsheet, worksheet="Sheet1", fit=True):
"""Core write logic that operates on an already-resolved connection object."""
gc = conn.sheets()
logger.debug(
"Writing %d rows to '%s'.'%s' via '%s'",
len(df), spreadsheet, worksheet, conn.name,
)

sh = gc.open_by_key(spreadsheet)
wks = sh.worksheet_by_title(worksheet)
wks.clear()
wks.set_dataframe(df, start="A1", fit=fit, nan="", escape_formulae=True)

logger.debug("Write complete")


def read_sheet(spreadsheet, worksheet="Sheet1", connection=None):
"""Read a Google Sheets worksheet into a pandas DataFrame.

Parameters
----------
spreadsheet : str
The spreadsheet ID (the long string in the Google Sheets URL).
worksheet : str
The worksheet tab title. Defaults to ``"Sheet1"``.
connection : str, optional
Connection name. When *None*, falls back to the asset's default
connection (``BRUIN_CONNECTION`` env var).

Returns
-------
pandas.DataFrame
"""
conn, gc = _get_sheets_client(connection)
logger.debug("Reading '%s'.'%s' via '%s'", spreadsheet, worksheet, conn.name)

sh = gc.open_by_key(spreadsheet)
wks = sh.worksheet_by_title(worksheet)
df = wks.get_as_df(
numerize=True,
empty_value="",
include_tailing_empty=True,
include_tailing_empty_rows=False,
)

logger.debug("Read %d rows x %d cols", len(df), len(df.columns))
return df


def write_sheet(df, spreadsheet, worksheet="Sheet1", connection=None, fit=True):
"""Write a pandas DataFrame to a Google Sheets worksheet.

Parameters
----------
df : pandas.DataFrame
The data to write.
spreadsheet : str
The spreadsheet ID (the long string in the Google Sheets URL).
worksheet : str
The worksheet tab title. Defaults to ``"Sheet1"``.
connection : str, optional
Connection name. When *None*, falls back to the asset's default
connection (``BRUIN_CONNECTION`` env var).
fit : bool
If *True* (default), resize the sheet to match the DataFrame dimensions.
"""
conn, gc = _get_sheets_client(connection)
logger.debug(
"Writing %d rows to '%s'.'%s' via '%s'",
len(df), spreadsheet, worksheet, conn.name,
)

sh = gc.open_by_key(spreadsheet)
wks = sh.worksheet_by_title(worksheet)
wks.clear()
wks.set_dataframe(df, start="A1", fit=fit, nan="", escape_formulae=True)

logger.debug("Write complete")
16 changes: 16 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,19 @@ def vertica_connection_json():
"password": "s3cret",
"database": "analytics",
}


@pytest.fixture
def google_sheets_connection_json():
"""Standalone Google Sheets connection payload (service_account_json is double-serialized)."""
sa_info = {
"type": "service_account",
"project_id": "my-gcp-project",
"private_key_id": "key-id-123",
"private_key": "-----BEGIN RSA PRIVATE KEY-----\nfake\n-----END RSA PRIVATE KEY-----\n",
"client_email": "sa@my-gcp-project.iam.gserviceaccount.com",
"client_id": "123456789",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
}
return {"service_account_json": json.dumps(sa_info)}
Loading