Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@
<artifactId>arrow-memory-netty</artifactId>
<version>17.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.tika</groupId>
<artifactId>tika-core</artifactId>
</dependency>
</dependencies>
<build>
<resources>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,10 @@ private Completable logEvent(
parseFuture =
state
.getParser()
.parse(content)
.parse(
content,
traceIds.traceId(),
traceIds.spanId() != null ? traceIds.spanId() : "no_span")
.thenAccept(
parsedContent -> {
row.put(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.adk.plugins.agentanalytics;

import static java.nio.charset.StandardCharsets.UTF_8;

import com.google.auth.Credentials;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.jspecify.annotations.Nullable;

/** Offloads content to GCS. */
class GcsOffloader {
private final Storage storage;
private final String bucketName;
private final Executor executor;
private final boolean isStorageOverride;

GcsOffloader(
String projectId,
String bucketName,
Executor executor,
@Nullable Credentials credentials,
@Nullable Storage storageOverride) {
if (storageOverride != null) {
this.isStorageOverride = true;
this.storage = storageOverride;
} else {
this.isStorageOverride = false;
StorageOptions.Builder builder = StorageOptions.newBuilder().setProjectId(projectId);
if (credentials != null) {
builder.setCredentials(credentials);
}
this.storage = builder.build().getService();
}
this.bucketName = bucketName;
this.executor = executor;
}

/** Async wrapper around blocking GCS upload for binary data. */
CompletableFuture<String> uploadContent(byte[] data, String contentType, String path) {
return CompletableFuture.supplyAsync(
() -> {
BlobId blobId = BlobId.of(bucketName, path);
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType(contentType).build();
storage.create(blobInfo, data);
return String.format("gs://%s/%s", bucketName, path);
},
executor);
}

/** Async wrapper around blocking GCS upload for text data. */
CompletableFuture<String> uploadContent(String data, String contentType, String path) {
return uploadContent(data.getBytes(UTF_8), contentType, path);
}

String getBucketName() {
return bucketName;
}

void close() throws Exception {
if (storage != null && !isStorageOverride) {
storage.close();
}
}
}
146 changes: 129 additions & 17 deletions core/src/main/java/com/google/adk/plugins/agentanalytics/Parser.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.adk.plugins.agentanalytics.JsonFormatter.mapper;
import static com.google.adk.plugins.agentanalytics.JsonFormatter.smartTruncate;
import static com.google.adk.plugins.agentanalytics.JsonFormatter.truncate;
import static com.google.adk.plugins.agentanalytics.JsonFormatter.truncateAndAddSuffix;
import static com.google.adk.plugins.agentanalytics.JsonFormatter.truncateWithStatus;

import com.fasterxml.jackson.annotation.JsonProperty;
Expand All @@ -39,16 +40,43 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.tika.mime.MimeTypeException;
import org.apache.tika.mime.MimeTypes;
import org.jspecify.annotations.Nullable;
import org.threeten.bp.Instant;
import org.threeten.bp.LocalDate;
import org.threeten.bp.ZoneOffset;

/** Utility for parsing content for BigQuery logging. */
final class Parser {
private static final String DEFAULT_EXTENSION = ".bin";
private static final int MAX_OFFLOADED_TEXT_LENGTH = 200;
private static final Logger logger = Logger.getLogger(Parser.class.getName());
private static final int INLINE_TEXT_LIMIT = 32 * 1024; // 32KB limit
private static final String UPLOAD_FAILED_MESSAGE = "[UPLOAD FAILED]";
private static final String MEDIA_OFFLOADED_MESSAGE = "[MEDIA OFFLOADED]";
private static final String BINARY_DATA_MESSAGE = "[BINARY DATA]";
private final int maxLength;
private static final String TEXT_OFFLOADED_SUFFIX = "... [OFFLOADED]";
private static final MimeTypes MIME_TYPES = MimeTypes.getDefaultMimeTypes();

Parser(int maxLength) {
private final @Nullable GcsOffloader offloader;
private final int maxLength;
private final @Nullable String connectionId;
private final boolean logMultiModalContent;

Parser(
@Nullable GcsOffloader offloader,
int maxLength,
@Nullable String connectionId,
boolean logMultiModalContent) {
this.offloader = offloader;
this.maxLength = maxLength;
this.connectionId = connectionId;
this.logMultiModalContent = logMultiModalContent;
}

@AutoValue
Expand Down Expand Up @@ -152,23 +180,27 @@ static ObjectRef create(
* Parses content into JSON payload and content parts, matching Python implementation.
*
* @param content the content to parse
* @param traceId the trace ID for GCS path
* @param spanId the span ID for GCS path
* @return a CompletableFuture of ParsedContent object
*/
CompletableFuture<ParsedContent> parse(Object content) {
CompletableFuture<ParsedContent> parse(Object content, String traceId, String spanId) {
if (content instanceof LlmRequest llmRequest) {
ObjectNode jsonPayload = mapper.createObjectNode();
ArrayNode messages = mapper.createArrayNode();
List<CompletableFuture<ParsedContentObject>> futures = new ArrayList<>();
List<Content> contents = llmRequest.contents();

for (Content c : contents) {
futures.add(parseContentObject(c));
futures.add(parseContentObject(c, traceId, spanId));
}

CompletableFuture<ParsedContentObject> systemFuture = null;
if (llmRequest.config().isPresent()
&& llmRequest.config().get().systemInstruction().isPresent()) {
systemFuture = parseContentObject(llmRequest.config().get().systemInstruction().get());
systemFuture =
parseContentObject(
llmRequest.config().get().systemInstruction().get(), traceId, spanId);
futures.add(systemFuture);
}
CompletableFuture<ParsedContentObject> finalSystemFuture = systemFuture;
Expand Down Expand Up @@ -202,7 +234,7 @@ CompletableFuture<ParsedContent> parse(Object content) {
}
if (content instanceof LlmResponse llmResponse) {
ObjectNode jsonPayload = mapper.createObjectNode();
return parseContentObject(llmResponse.content().orElse(null))
return parseContentObject(llmResponse.content().orElse(null), traceId, spanId)
.thenApply(
parsed -> {
ObjectNode summaryNode = mapper.createObjectNode();
Expand All @@ -225,7 +257,7 @@ CompletableFuture<ParsedContent> parse(Object content) {
});
}
if (content instanceof Content || content instanceof Part) {
return parseContentObject(content)
return parseContentObject(content, traceId, spanId)
.thenApply(
parsed -> {
ObjectNode summaryNode = mapper.createObjectNode();
Expand All @@ -249,10 +281,13 @@ CompletableFuture<ParsedContent> parse(Object content) {
* Parses a Content or Part object into summary text and content parts.
*
* @param content the Content or Part object to parse
* @param traceId the trace ID for GCS path
* @param spanId the span ID for GCS path
* @return a CompletableFuture of ParsedContentObject containing parts, summary, and truncation
* flag
*/
private CompletableFuture<ParsedContentObject> parseContentObject(Object content) {
private CompletableFuture<ParsedContentObject> parseContentObject(
Object content, String traceId, String spanId) {
List<Part> parts;
if (content instanceof Content c) {
parts = c.parts().orElse(ImmutableList.of());
Expand All @@ -265,7 +300,7 @@ private CompletableFuture<ParsedContentObject> parseContentObject(Object content

List<CompletableFuture<TruncationResult>> partFutures = new ArrayList<>();
for (int i = 0; i < parts.size(); i++) {
partFutures.add(processPart(parts.get(i), i));
partFutures.add(processPart(parts.get(i), i, traceId, spanId));
}

return CompletableFuture.allOf(partFutures.toArray(new CompletableFuture<?>[0]))
Expand Down Expand Up @@ -295,7 +330,8 @@ private CompletableFuture<ParsedContentObject> parseContentObject(Object content
});
}

private CompletableFuture<TruncationResult> processPart(Part part, int index) {
private CompletableFuture<TruncationResult> processPart(
Part part, int index, String traceId, String spanId) {
ContentPart.Builder partBuilder =
ContentPart.builder()
.setPartIndex(index)
Expand All @@ -320,17 +356,89 @@ private CompletableFuture<TruncationResult> processPart(Part part, int index) {
if (part.inlineData().isPresent()) {
Blob blob = part.inlineData().get();
String mimeType = blob.mimeType().orElse("application/octet-stream");
partBuilder.setText(BINARY_DATA_MESSAGE).setMimeType(mimeType);
return CompletableFuture.completedFuture(
TruncationResult.create(mapper.valueToTree(partBuilder.build()), false));
if (logMultiModalContent && offloader != null) {
String ext = DEFAULT_EXTENSION;
try {
ext = MIME_TYPES.forName(mimeType).getExtension();
} catch (MimeTypeException e) {
logger.log(Level.WARNING, "Failed to get extension for mime type " + mimeType, e);
}
String path =
String.format(
"%s/%s/%s_p%d_%s%s",
getLocalDate(), traceId, spanId, index, UUID.randomUUID(), ext);
return offloader
.uploadContent(blob.data().orElse(new byte[0]), mimeType, path)
.handle(
(uri, ex) -> {
if (ex != null) {
logger.log(Level.WARNING, "Failed to offload content to GCS", ex);
partBuilder.setText(UPLOAD_FAILED_MESSAGE);
} else {
ObjectNode details = mapper.createObjectNode();
ObjectNode gcsMetadata = details.putObject("gcs_metadata");
gcsMetadata.put("content_type", mimeType);

partBuilder
.setStorageMode("GCS_REFERENCE")
.setUri(uri)
.setMimeType(mimeType)
.setText(MEDIA_OFFLOADED_MESSAGE)
.setObjectRef(
mapper.valueToTree(ObjectRef.create(uri, null, connectionId, details)));
}
return TruncationResult.create(mapper.valueToTree(partBuilder.build()), false);
});
} else {
partBuilder.setText(BINARY_DATA_MESSAGE).setMimeType(mimeType);
return CompletableFuture.completedFuture(
TruncationResult.create(mapper.valueToTree(partBuilder.build()), false));
}
}
// CASE C: Text
if (part.text().isPresent()) {
String text = part.text().get();
TruncationResult res = truncateWithStatus(text, maxLength);
partBuilder.setText(res.node().asText());
return CompletableFuture.completedFuture(
TruncationResult.create(mapper.valueToTree(partBuilder.build()), res.isTruncated()));
int textLen = Utf8.encodedLength(text);
int offloadThreshold = Math.min(INLINE_TEXT_LIMIT, maxLength);

if (offloader != null && textLen > offloadThreshold) {

String path =
String.format(
"%s/%s/%s_p%d_%s.txt", getLocalDate(), traceId, spanId, index, UUID.randomUUID());
return offloader
.uploadContent(text, "text/plain", path)
.handle(
(uri, ex) -> {
if (ex != null) {
logger.log(Level.WARNING, "Failed to offload text to GCS", ex);
TruncationResult res = truncateWithStatus(text, maxLength);
partBuilder.setText(res.node().asText());
return TruncationResult.create(
mapper.valueToTree(partBuilder.build()), res.isTruncated());
} else {
ObjectNode details = mapper.createObjectNode();
ObjectNode gcsMetadata = details.putObject("gcs_metadata");
gcsMetadata.put("content_type", "text/plain");

partBuilder
.setStorageMode("GCS_REFERENCE")
.setUri(uri)
.setMimeType("text/plain")
.setText(
truncateAndAddSuffix(
text, MAX_OFFLOADED_TEXT_LENGTH, TEXT_OFFLOADED_SUFFIX))
.setObjectRef(
mapper.valueToTree(ObjectRef.create(uri, null, connectionId, details)));
return TruncationResult.create(mapper.valueToTree(partBuilder.build()), true);
}
});
} else {
TruncationResult res = truncateWithStatus(text, maxLength);
partBuilder.setText(res.node().asText());
return CompletableFuture.completedFuture(
TruncationResult.create(mapper.valueToTree(partBuilder.build()), res.isTruncated()));
}
}
if (part.functionCall().isPresent()) {
FunctionCall fc = part.functionCall().get();
Expand Down Expand Up @@ -379,4 +487,8 @@ ArrayNode formatContentParts(Optional<Content> content) {
}
return partsArray;
}

private LocalDate getLocalDate() {
return Instant.now().atZone(ZoneOffset.UTC).toLocalDate();
}
}
Loading