diff --git a/README.md b/README.md index cd7c867..2e6661a 100644 --- a/README.md +++ b/README.md @@ -29,3 +29,4 @@ The service can be configured with the following environment variables: - `CRON_PATTERN` [string]: the cron pattern which the cronjob should use. (default: `* 0 * * * *`) - `LDES_ENDPOINT_HEADER_` [string]: A header key-value combination which should be send as part of the headers to the LDES ENDPOINT. E.g. `LDES_ENDPOINT_HEADER_X-API-KEY: `. +- `SPARQL_ENDPOINT_HEADER_` [string]: A header key-value combination which should be send as part of the headers to the SPARQL ENDPOINT. diff --git a/app.ts b/app.ts index 979356f..1341ae9 100644 --- a/app.ts +++ b/app.ts @@ -8,9 +8,14 @@ import { import { DataFactory } from "n3"; import * as RDF from "rdf-js"; import Consumer, { Member } from "ldes-consumer"; -import { convertBlankNodes, extractBaseResourceUri, extractLDESEndpointHeadersFromEnv } from "./utils"; +import { convertBlankNodes, extractBaseResourceUri, extractEndpointHeadersFromEnv } from "./utils"; import { CronJob } from "cron"; -import { CRON_PATTERN, LDES_ENDPOINT_VIEW, REPLACE_VERSIONS } from "./config"; +import { + CRON_PATTERN, + LDES_ENDPOINT_HEADER_PREFIX, + LDES_ENDPOINT_VIEW, + REPLACE_VERSIONS, +} from "./config"; const { quad, variable } = DataFactory; async function processMember(member: Member) { @@ -42,7 +47,7 @@ const consumerJob = new CronJob(CRON_PATTERN, async () => { const consumer = new Consumer({ endpoint, initialState, - requestHeaders: extractLDESEndpointHeadersFromEnv() + requestHeaders: extractEndpointHeadersFromEnv(LDES_ENDPOINT_HEADER_PREFIX) }); consumer.listen( async (member) => { diff --git a/config.ts b/config.ts index 444b47a..411434b 100644 --- a/config.ts +++ b/config.ts @@ -5,3 +5,4 @@ export const LDES_RELATION_PATH = "http://www.w3.org/ns/prov#generatedAtTime"; export const MU_APPLICATION_GRAPH = process.env.MU_APPLICATION_GRAPH; export const CRON_PATTERN = process.env.CRON_PATTERN || "0 * * * * *"; export const LDES_ENDPOINT_HEADER_PREFIX = 'LDES_ENDPOINT_HEADER_' +export const SPARQL_ENDPOINT_HEADER_PREFIX = 'SPARQL_ENDPOINT_HEADER_' diff --git a/sparql-queries.ts b/sparql-queries.ts index fbf1449..c80dcd7 100644 --- a/sparql-queries.ts +++ b/sparql-queries.ts @@ -1,5 +1,5 @@ import * as RDF from "rdf-js"; -import { fromDate, toString } from "./utils"; +import { extractEndpointHeadersFromEnv, fromDate, toString } from "./utils"; import { querySudo as query, updateSudo as update } from "@lblod/mu-auth-sudo"; import { DataFactory } from "n3"; import { LDES, PROV, PURL, TREE } from "./namespaces"; @@ -9,6 +9,8 @@ const { quad, namedNode, variable, literal } = DataFactory; const stream = namedNode(LDES_STREAM); +const SPARQL_ENDPOINT_HEADERS = extractEndpointHeadersFromEnv(SPARQL_ENDPOINT_HEADER_PREFIX); + function constructTriplesString(quads: RDF.Quad[]) { let triplesString = quads.map(toString).join("\n"); return triplesString; @@ -61,7 +63,7 @@ export function constructSelectQuery( export async function executeInsertQuery(quads: RDF.Quad[]) { let queryStr = constructInsertQuery(quads); try { - await update(queryStr); + await update(queryStr, SPARQL_ENDPOINT_HEADERS); } catch (e) { console.error(e); } @@ -70,7 +72,7 @@ export async function executeInsertQuery(quads: RDF.Quad[]) { export async function executeDeleteQuery(quads: RDF.Quad[]) { let queryStr = constructDeleteQuery(quads); try { - await update(queryStr); + await update(queryStr, SPARQL_ENDPOINT_HEADERS); } catch (e) { console.error(e); } @@ -80,14 +82,8 @@ export async function executeDeleteInsertQuery( quadsToDelete: RDF.Quad[], quadsToInsert: RDF.Quad[] ) { - let deleteQuery = constructDeleteQuery(quadsToDelete); - let insertQuery = constructInsertQuery(quadsToInsert); - try { - await update(deleteQuery); - await update(insertQuery); - } catch (e) { - console.error(e); - } + await executeDeleteQuery(quadsToDelete); + await executeInsertQuery(quadsToInsert); } export async function fetchState(): Promise { @@ -97,7 +93,7 @@ export async function fetchState(): Promise { let variables = [variable("state")]; const sparql_query = constructSelectQuery(variables, quads); try { - const response = await query(sparql_query); + const response = await query(sparql_query, SPARQL_ENDPOINT_HEADERS); const stateString = extractVariableFromResponse(response, "state")?.shift(); if (stateString) { return JSON.parse(stateString); @@ -125,7 +121,7 @@ export async function getVersion(resource: RDF.NamedNode) { const sparql_query = constructSelectQuery(variables, quads); try { - const response = await query(sparql_query); + const response = await query(sparql_query, SPARQL_ENDPOINT_HEADERS); const versionUris = extractVariableFromResponse(response, "v"); if (versionUris) { return namedNode(versionUris[0]); diff --git a/utils.ts b/utils.ts index 16be9eb..f367cb3 100644 --- a/utils.ts +++ b/utils.ts @@ -65,13 +65,13 @@ export function extractBaseResourceUri( return; } -export function extractLDESEndpointHeadersFromEnv() { +export function extractEndpointHeadersFromEnv(prefix: string) { const headers: { [key: string]: number | string | string[]; } = {}; for (const [key, value] of Object.entries(process.env)) { - if (key.startsWith(LDES_ENDPOINT_HEADER_PREFIX)) { - const headerKey = key.split(LDES_ENDPOINT_HEADER_PREFIX).pop(); + if (key.startsWith(prefix)) { + const headerKey = key.split(prefix).pop(); if (headerKey && value) { headers[headerKey.toLowerCase()] = value; } @@ -79,3 +79,5 @@ export function extractLDESEndpointHeadersFromEnv() { } return headers; } + +