Skip to content

Commit

Permalink
feat: webhook self call for Eventarc-triggered events (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicain authored Jul 11, 2023
1 parent ee8ed5b commit c86b3d6
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 2 deletions.
2 changes: 1 addition & 1 deletion deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
set -e

export PROJECT_ID="velociraptor-16p1-test-0"
export PROJECT_ID="velociraptor-16p1-test-13"
export TF_PLAN_STORAGE_BUCKET="${PROJECT_ID?}-tf"
export BUCKET_NAME=${TF_PLAN_STORAGE_BUCKET?}-main
export TERRAFORM_IMAGE="hashicorp/terraform:1.4.6"
Expand Down
1 change: 1 addition & 0 deletions main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ resource "google_service_account" "webhook" {
resource "google_project_iam_member" "webhook_sa_roles" {
project = var.project_id
for_each = toset([
"roles/run.invoker",
"roles/cloudfunctions.invoker",
"roles/storage.admin",
"roles/logging.logWriter",
Expand Down
45 changes: 44 additions & 1 deletion webhook/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
from google.cloud import logging
import vertexai
from vertexai.preview.language_models import TextGenerationModel
import google.auth.transport.requests
import google.oauth2.id_token
import requests
import flask

from bigquery import write_summarization_to_table
from document_extract import async_document_extract
Expand Down Expand Up @@ -68,23 +72,62 @@ def summarize_text(text: str, parameters: None | dict[str, int | float] = None)
return response.text


def redirect_and_reply(previous_data):
endpoint = f'https://{_LOCATION}-{_PROJECT_ID}.cloudfunctions.net/{os.environ["K_SERVICE"]}'
logging_client = logging.Client()
logger = logging_client.logger(_FUNCTIONS_VERTEX_EVENT_LOGGER)

auth_req = google.auth.transport.requests.Request()
id_token = google.oauth2.id_token.fetch_id_token(auth_req, endpoint)
data = {
'name': previous_data["name"],
'id': previous_data["id"],
'bucket': previous_data["bucket"],
'timeCreated': previous_data["timeCreated"],
}
headers = {}
headers["Authorization"] = f"Bearer {id_token}"
logger.log(f'TRIGGERING JOB FLOW: {endpoint}')
try:
requests.post(
endpoint,
json=data,
timeout=1,
headers=headers,
)
except requests.exceptions.Timeout:
return flask.Response(status=200)
except Exception:
return flask.Response(status=500)
return flask.Response(status=200)


def entrypoint(request: object) -> dict[str, str]:
data = request.get_json()
if data.get("kind", None) == "storage#object":
# Entrypoint called by Pub-Sub (Eventarc)
return redirect_and_reply(data)

if 'bucket' in data:
# Entrypoint called by REST (possibly by redirect_and_replay)
return cloud_event_entrypoint(
name=data["name"],
event_id=data["id"],
bucket=data["bucket"],
time_created=coerce_datetime_zulu(data["timeCreated"]),
)
else:

if "text" in data:
# Entrypoint called by REST.
return summarization_entrypoint(
name=data["name"],
extracted_text=data["text"],
time_created=datetime.datetime.now(datetime.timezone.utc),
event_id="CURL_TRIGGER",
)

return flask.Response(status=500)


def cloud_event_entrypoint(event_id, bucket, name, time_created):
orig_pdf_uri = f"gs://{bucket}/{name}"
Expand Down
1 change: 1 addition & 0 deletions webhook/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ google-cloud-bigquery==3.10.0
google-cloud-logging==3.5.0
google-cloud-storage==2.9.0
google-cloud-vision==3.4.1
requests==2.31.0

0 comments on commit c86b3d6

Please sign in to comment.