diff --git a/src/database.ts b/src/database.ts index 7b8bf734..5e177ec1 100644 --- a/src/database.ts +++ b/src/database.ts @@ -30,6 +30,8 @@ interface IExtensions { storeState: StoreStateRepo; } +export type DatabaseClient = IDatabase & IExtensions & IMain; + // pg-promise initialization options: const initOptions: IOptions = { promiseLib: promise, @@ -60,7 +62,7 @@ const config = { const pgp: IMain = pgPromise(initOptions); // Create the database instance with extensions: -const db = pgp(config) as IDatabase & IExtensions & IMain; +const db = pgp(config) as DatabaseClient; // Load and initialize optional diagnostics: import diagnostics = require("./util/db/diagnostics"); diff --git a/src/dgraph_import.ts b/src/dgraph_import.ts index 947c0abf..dcc1f6cb 100644 --- a/src/dgraph_import.ts +++ b/src/dgraph_import.ts @@ -1,6 +1,6 @@ import parseArgv from "minimist"; import { Cursor, ICursorResult } from "./ingest/cursor"; -import { Connection } from "./storage/connection"; +import { Dgraph } from "./storage/dgraph"; import logger from "./util/logger"; import "./util/memo"; import { DGRAPH_URL } from "./util/secrets"; @@ -21,7 +21,7 @@ try { process.exit(-1); } -const c = new Connection(); +const c = new Dgraph(); setStellarNetwork().then((network: string) => { logger.info(`Using ${network}`); diff --git a/src/graphql.ts b/src/graphql.ts index 3c819ac4..d601434e 100644 --- a/src/graphql.ts +++ b/src/graphql.ts @@ -4,9 +4,10 @@ import { ApolloError, ApolloServer } from "apollo-server"; import { GraphQLError } from "graphql"; import Honeybadger from "honeybadger"; +import { db } from "./database"; import { Cursor, Worker } from "./ingest"; import schema from "./schema"; -import { Connection } from "./storage/connection"; +import { Dgraph } from "./storage/dgraph"; import logger from "./util/logger"; import { BIND_ADDRESS, DEBUG_LEDGER, DGRAPH_URL, INGEST_INTERVAL, PORT } from "./util/secrets"; import { setNetwork as setStellarNetwork } from "./util/stellar"; @@ -18,6 +19,7 @@ const server = new ApolloServer({ playground: true, debug: true, cors: true, + context: () => ({ dgraph: new Dgraph(), db }), formatError: (error: ApolloError) => { logger.error(error); @@ -43,7 +45,7 @@ setStellarNetwork().then((network: string) => { if (DGRAPH_URL) { logger.info(`[DGraph] Updating schema...`); - new Connection().migrate().catch((err: any) => { + new Dgraph().migrate().catch((err: any) => { logger.error(err); Honeybadger.notify(err); process.exit(-1); diff --git a/src/ingest/worker.ts b/src/ingest/worker.ts index 23985cb9..9896aaa8 100644 --- a/src/ingest/worker.ts +++ b/src/ingest/worker.ts @@ -3,7 +3,7 @@ import { Cursor } from "./cursor"; import { SubscriptionPayloadCollection } from "./subscription_payload_collection"; -import { Connection } from "../storage/connection"; +import { Dgraph } from "../storage/dgraph"; import { DGRAPH_URL } from "../util/secrets"; export class Worker { @@ -23,7 +23,7 @@ export class Worker { await Publisher.publish(header, collection); if (DGRAPH_URL) { - const c = new Connection(); + const c = new Dgraph(); await c.importLedgerTransactions(header, transactions); await c.importLedgerState(header, transactions); c.close(); diff --git a/src/schema/resolvers/account.ts b/src/schema/resolvers/account.ts index 8c755858..8d6a13bd 100644 --- a/src/schema/resolvers/account.ts +++ b/src/schema/resolvers/account.ts @@ -1,18 +1,17 @@ +import _ from "lodash"; import { Account, DataEntry, Signer, TrustLine } from "../../model"; import { withFilter } from "graphql-subscriptions"; import { createBatchResolver, eventMatches, ledgerResolver } from "./util"; -import { db } from "../../database"; import { joinToMap } from "../../util/array"; import { ACCOUNT, pubsub } from "../../pubsub"; +import { IApolloContext } from "../../util/types"; -const fetchIDs = (r: any) => r.id; - -const signersResolver = createBatchResolver(async (source: any) => { - const accountIDs = source.map(fetchIDs); - const signers = await db.signers.findAllByAccountIDs(accountIDs); +const signersResolver = createBatchResolver(async (source: any, args: any, ctx: IApolloContext) => { + const accountIDs = _.map(source, "id"); + const signers = await ctx.db.signers.findAllByAccountIDs(accountIDs); const map = joinToMap(accountIDs, signers); @@ -30,23 +29,25 @@ const signersResolver = createBatchResolver(async (source: an return signers; }); -const dataEntriesResolver = createBatchResolver((source: any) => - db.dataEntries.findAllByAccountIDs(source.map(fetchIDs)) +const dataEntriesResolver = createBatchResolver((source: any, args: any, ctx: IApolloContext) => + ctx.db.dataEntries.findAllByAccountIDs(_.map(source, "id")) ); -const trustLinesResolver = createBatchResolver(async (source: any) => { - const accountIDs = source.map(fetchIDs); - const trustLines = await db.trustLines.findAllByAccountIDs(accountIDs); +const trustLinesResolver = createBatchResolver( + async (source: any, args: any, ctx: IApolloContext) => { + const accountIDs = _.map(source, "id"); + const trustLines = await ctx.db.trustLines.findAllByAccountIDs(accountIDs); - const map = joinToMap(accountIDs, trustLines); + const map = joinToMap(accountIDs, trustLines); - for (const [accountID, accountTrustLines] of map) { - const account = source.find((acc: Account) => acc.id === accountID); - accountTrustLines.unshift(TrustLine.buildFakeNative(account)); - } + for (const [accountID, accountTrustLines] of map) { + const account = source.find((acc: Account) => acc.id === accountID); + accountTrustLines.unshift(TrustLine.buildFakeNative(account)); + } - return trustLines; -}); + return trustLines; + } +); const accountSubscription = (event: string) => { return { @@ -57,14 +58,14 @@ const accountSubscription = (event: string) => { } ), - resolve(payload: any, args: any, ctx: any, info: any) { + resolve(payload: any, args: any, ctx: IApolloContext, info: any) { return payload; } }; }; -const signerForResolver = async (subject: Account, args: any) => { - const accounts = db.accounts.findAllBySigner(subject.id, args.first); +const signerForResolver = async (subject: Account, args: any, ctx: IApolloContext) => { + const accounts = ctx.db.accounts.findAllBySigner(subject.id, args.first); return [subject].concat(await accounts); }; @@ -77,20 +78,20 @@ export default { signerFor: signerForResolver }, Query: { - account(root: any, args: any, ctx: any, info: any) { - return db.accounts.findByID(args.id); + account(root: any, args: any, ctx: IApolloContext, info: any) { + return ctx.db.accounts.findByID(args.id); }, - accounts(root: any, args: any, ctx: any, info: any) { - return db.accounts.findAllByIDs(args.id); + accounts(root: any, args: any, ctx: IApolloContext, info: any) { + return ctx.db.accounts.findAllByIDs(args.id); }, - async accountsSignedBy(root: any, args: any, ctx: any, info: any) { - const account = await db.accounts.findByID(args.id); + async accountsSignedBy(root: any, args: any, ctx: IApolloContext, info: any) { + const account = await ctx.db.accounts.findByID(args.id); if (!account) { return []; } - return [account].concat(await db.accounts.findAllBySigner(args.id, args.first)); + return [account].concat(await ctx.db.accounts.findAllBySigner(args.id, args.first)); } }, Subscription: { diff --git a/src/schema/resolvers/asset.ts b/src/schema/resolvers/asset.ts index 6f5eaf66..5b667d9b 100644 --- a/src/schema/resolvers/asset.ts +++ b/src/schema/resolvers/asset.ts @@ -1,9 +1,9 @@ -import { db } from "../../database"; +import { IApolloContext } from "../../util/types"; export default { Query: { - async assets(root: any, args: any, ctx: any, info: any) { - const data = await db.assets.findAll(args.code, args.issuer, args.first, args.offset); + async assets(root: any, args: any, ctx: IApolloContext, info: any) { + const data = await ctx.db.assets.findAll(args.code, args.issuer, args.first, args.offset); return data.map(asset => { return { diff --git a/src/schema/resolvers/data_entry.ts b/src/schema/resolvers/data_entry.ts index e46d33ee..663efd31 100644 --- a/src/schema/resolvers/data_entry.ts +++ b/src/schema/resolvers/data_entry.ts @@ -3,12 +3,11 @@ import { Account, DataEntry } from "../../model"; import { withFilter } from "graphql-subscriptions"; import { createBatchResolver, eventMatches, ledgerResolver } from "./util"; -import { db } from "../../database"; - import { DATA_ENTRY, pubsub } from "../../pubsub"; +import { IApolloContext } from "../../util/types"; -const accountResolver = createBatchResolver((source: any) => - db.accounts.findAllByIDs(source.map((r: DataEntry) => r.accountID)) +const accountResolver = createBatchResolver((source: any, args: any, ctx: IApolloContext) => + ctx.db.accounts.findAllByIDs(source.map((r: DataEntry) => r.accountID)) ); const dataEntrySubscription = (event: string) => { @@ -20,7 +19,7 @@ const dataEntrySubscription = (event: string) => { } ), - resolve(payload: any, args: any, ctx: any, info: any) { + resolve(payload: any, args: any, ctx: IApolloContext, info: any) { return payload; } }; @@ -35,8 +34,8 @@ export default { dataEntry: dataEntrySubscription(DATA_ENTRY) }, Query: { - dataEntries(root: any, args: any, ctx: any, info: any) { - return db.dataEntries.findAllByAccountID(args.id); + dataEntries(root: any, args: any, ctx: IApolloContext, info: any) { + return ctx.db.dataEntries.findAllByAccountID(args.id); } } }; diff --git a/src/schema/resolvers/ledger.ts b/src/schema/resolvers/ledger.ts index 717242f1..f05e4421 100644 --- a/src/schema/resolvers/ledger.ts +++ b/src/schema/resolvers/ledger.ts @@ -1,12 +1,12 @@ -import { db } from "../../database"; import { Ledger, LedgerHeader } from "../../model"; import { pubsub } from "../../pubsub"; +import { IApolloContext } from "../../util/types"; import { createBatchResolver } from "./util"; const LEDGER_CREATED = "LEDGER_CREATED"; -const ledgerHeaderResolver = createBatchResolver((source: any) => - db.ledgerHeaders.findAllBySeq(source.map((r: Ledger) => r.seq)) +const ledgerHeaderResolver = createBatchResolver((source: any, args: any, ctx: IApolloContext) => + ctx.db.ledgerHeaders.findAllBySeq(source.map((r: Ledger) => r.seq)) ); export default { @@ -14,16 +14,16 @@ export default { header: ledgerHeaderResolver }, Query: { - ledger(root: any, args: any, ctx: any, info: any) { + ledger(root: any, args: any, ctx: IApolloContext, info: any) { return new Ledger(args.seq); }, - ledgers(root: any, args: any, ctx: any, info: any) { + ledgers(root: any, args: any, ctx: IApolloContext, info: any) { return args.seq.map((seq: number) => new Ledger(seq)); } }, Subscription: { ledgerCreated: { - resolve(payload: any, args: any, ctx: any, info: any) { + resolve(payload: any, args: any, ctx: IApolloContext, info: any) { return payload; }, subscribe() { diff --git a/src/schema/resolvers/offer.ts b/src/schema/resolvers/offer.ts index 15566eb4..0bb68b39 100644 --- a/src/schema/resolvers/offer.ts +++ b/src/schema/resolvers/offer.ts @@ -4,12 +4,11 @@ import Asset from "../../util/asset"; import { withFilter } from "graphql-subscriptions"; import { assetResolver, createBatchResolver, eventMatches, ledgerResolver } from "./util"; -import { db } from "../../database"; - import { OFFER, pubsub } from "../../pubsub"; +import { IApolloContext } from "../../util/types"; -const accountResolver = createBatchResolver((source: any) => - db.accounts.findAllByIDs(source.map((r: Offer) => r.sellerid)) +const accountResolver = createBatchResolver((source: any, args: any, ctx: IApolloContext) => + ctx.db.accounts.findAllByIDs(source.map((r: Offer) => r.sellerid)) ); const assetFromArg = (arg: IAssetInput): Asset | null => { @@ -51,7 +50,7 @@ const offerSubscription = (event: string) => { (payload, variables) => offerMatches(variables, payload) ), - resolve(payload: any, args: any, ctx: any, info: any) { + resolve(payload: any, args: any, ctx: IApolloContext, info: any) { return payload; } }; @@ -65,8 +64,8 @@ export default { ledger: ledgerResolver }, Query: { - offers(root: any, args: any, ctx: any, info: any) { - return db.offers.findAll(args.seller, args.selling, args.buying, args.first, args.limit); + offers(root: any, args: any, ctx: IApolloContext, info: any) { + return ctx.db.offers.findAll(args.seller, args.selling, args.buying, args.first, args.limit); } }, Subscription: { diff --git a/src/schema/resolvers/operation.ts b/src/schema/resolvers/operation.ts index 1b228d1d..4843c3d0 100644 --- a/src/schema/resolvers/operation.ts +++ b/src/schema/resolvers/operation.ts @@ -1,11 +1,11 @@ import _ from "lodash"; -import { Connection } from "../../storage/connection"; import { OperationsQuery } from "../../storage/queries/operations"; import { OperationKinds } from "../../storage/queries/operations/types"; +import { IApolloContext } from "../../util/types"; export default { IOperation: { - __resolveType(obj: any, context: any, info: any) { + __resolveType(obj: any, context: IApolloContext, info: any) { switch (obj.kind) { case OperationKinds.Payment: return "PaymentOperation"; @@ -33,11 +33,10 @@ export default { } }, Query: { - operations(root: any, args: any, ctx: any, info: any) { + operations(root: any, args: any, ctx: IApolloContext, info: any) { const { account, first, offset, filters } = args; - const conn = new Connection(); - const query = new OperationsQuery(conn, account, filters, first, offset); + const query = new OperationsQuery(ctx.dgraph, account, filters, first, offset); return query.call(); } diff --git a/src/schema/resolvers/signer.ts b/src/schema/resolvers/signer.ts index 89a08d80..1288170b 100644 --- a/src/schema/resolvers/signer.ts +++ b/src/schema/resolvers/signer.ts @@ -1,13 +1,13 @@ -import { db } from "../../database"; import { Account, Signer } from "../../model"; +import { IApolloContext } from "../../util/types"; import { createBatchResolver } from "./util"; -const accountResolver = createBatchResolver((source: any) => - db.accounts.findAllByIDs(source.map((r: Signer) => r.accountID)) +const accountResolver = createBatchResolver((source: any, args: any, ctx: IApolloContext) => + ctx.db.accounts.findAllByIDs(source.map((r: Signer) => r.accountID)) ); -const signerResolver = createBatchResolver((source: any) => - db.accounts.findAllByIDs(source.map((r: Signer) => r.signer)) +const signerResolver = createBatchResolver((source: any, args: any, ctx: IApolloContext) => + ctx.db.accounts.findAllByIDs(source.map((r: Signer) => r.signer)) ); export default { @@ -16,11 +16,11 @@ export default { signer: signerResolver }, Query: { - async signers(root: any, args: any, ctx: any, info: any) { - const account = await db.accounts.findByID(args.id); + async signers(root: any, args: any, ctx: IApolloContext, info: any) { + const account = await ctx.db.accounts.findByID(args.id); if (account !== null) { - const signers = await db.signers.findAllByAccountID(args.id); + const signers = await ctx.db.signers.findAllByAccountID(args.id); signers.unshift( new Signer({ diff --git a/src/schema/resolvers/transaction.ts b/src/schema/resolvers/transaction.ts index 140b74ae..8e79cacd 100644 --- a/src/schema/resolvers/transaction.ts +++ b/src/schema/resolvers/transaction.ts @@ -1,6 +1,5 @@ -import { db } from "../../database"; -import { Connection as DgraphConnection } from "../../storage/connection"; import { AccountTransactionsQuery } from "../../storage/queries/account_transactions"; +import { IApolloContext } from "../../util/types"; import { ledgerResolver, memoResolver } from "./util"; export default { @@ -9,15 +8,14 @@ export default { memo: memoResolver }, Query: { - transaction(root: any, args: any, ctx: any, info: any) { - return db.transactions.findByID(args.id); + transaction(root: any, args: any, ctx: IApolloContext, info: any) { + return ctx.db.transactions.findByID(args.id); }, - transactions(root: any, args: any, ctx: any, info: any) { - return db.transactions.findAllByID(args.id); + transactions(root: any, args: any, ctx: IApolloContext, info: any) { + return ctx.db.transactions.findAllByID(args.id); }, - accountTransactions(root: any, args: any, ctx: any, info: any) { - const dc = new DgraphConnection(); - const query = new AccountTransactionsQuery(dc, args.id, args.first, args.offset); + accountTransactions(root: any, args: any, ctx: IApolloContext, info: any) { + const query = new AccountTransactionsQuery(ctx.dgraph, args.id, args.first, args.offset); return query.call(); } diff --git a/src/schema/resolvers/trust_line.ts b/src/schema/resolvers/trust_line.ts index da23c57d..4497338c 100644 --- a/src/schema/resolvers/trust_line.ts +++ b/src/schema/resolvers/trust_line.ts @@ -1,13 +1,13 @@ -import { Account, TrustLine } from "../../model"; -import { assetResolver, createBatchResolver, eventMatches, ledgerResolver } from "./util"; - import { withFilter } from "graphql-subscriptions"; -import { db } from "../../database"; +import { Account, TrustLine } from "../../model"; import { pubsub, TRUST_LINE } from "../../pubsub"; +import { IApolloContext } from "../../util/types"; +import { assetResolver, createBatchResolver, eventMatches, ledgerResolver } from "./util"; -const accountResolver = createBatchResolver((source: ReadonlyArray) => - db.accounts.findAllByIDs(source.map(r => r.accountID)) +const accountResolver = createBatchResolver( + (source: ReadonlyArray, args: any, ctx: IApolloContext) => + ctx.db.accounts.findAllByIDs(source.map(r => r.accountID)) ); const trustLineSubscription = (event: string) => { @@ -19,7 +19,7 @@ const trustLineSubscription = (event: string) => { } ), - resolve(payload: any, args: any, ctx: any, info: any) { + resolve(payload: any, args: any, ctx: IApolloContext, info: any) { return payload; } }; @@ -35,11 +35,11 @@ export default { trustLine: trustLineSubscription(TRUST_LINE) }, Query: { - async trustLines(root: any, args: any, ctx: any, info: any) { - const account = await db.accounts.findByID(args.id); + async trustLines(root: any, args: any, ctx: IApolloContext, info: any) { + const account = await ctx.db.accounts.findByID(args.id); if (account !== null) { - const trustLines = await db.trustLines.findAllByAccountID(args.id); + const trustLines = await ctx.db.trustLines.findAllByAccountID(args.id); trustLines.unshift(TrustLine.buildFakeNative(account)); diff --git a/src/storage/cache.ts b/src/storage/cache.ts index d59f0aa3..5027c019 100644 --- a/src/storage/cache.ts +++ b/src/storage/cache.ts @@ -1,5 +1,5 @@ import _ from "lodash"; -import { Connection } from "../storage/connection"; +import { Dgraph } from "../storage/dgraph"; import { ILink, NQuad, NQuads } from "./nquads"; const ___cache = new Map(); @@ -7,7 +7,7 @@ const ___cache = new Map(); // An idea is simple: every node in DGraph database has unique string key, which we use either to distinguish // it in in-memory cache and to load uid from DGraph. export class Cache { - constructor(private connection: Connection, private nquads: NQuads) {} + constructor(private connection: Dgraph, private nquads: NQuads) {} public async populate(): Promise { const { hits, misses } = this.hitsAndMisses(); diff --git a/src/storage/connection.ts b/src/storage/dgraph.ts similarity index 98% rename from src/storage/connection.ts rename to src/storage/dgraph.ts index 409d453a..8fdc9b3c 100644 --- a/src/storage/connection.ts +++ b/src/storage/dgraph.ts @@ -36,7 +36,7 @@ const SCHEMA = ` offer_id: string @index (exact) . `; -export class Connection { +export class Dgraph { private stub: any; private client: any; @@ -44,6 +44,7 @@ export class Connection { this.stub = new DgraphClientStub(DGRAPH_URL, grpc.credentials.createInsecure()); this.client = new DgraphClient(this.stub); + this.client.setDebugMode(); } public close() { diff --git a/src/storage/queries/account_transactions.ts b/src/storage/queries/account_transactions.ts index fabbf419..b90a6a2e 100644 --- a/src/storage/queries/account_transactions.ts +++ b/src/storage/queries/account_transactions.ts @@ -1,7 +1,7 @@ import dig from "object-dig"; import { Memo } from "stellar-sdk"; import { ITransaction } from "../../model/transaction"; -import { Connection } from "../connection"; +import { Dgraph } from "../dgraph"; import { ITransactionData } from "../types"; import { Query } from "./query"; @@ -12,7 +12,7 @@ export class AccountTransactionsQuery extends Query { private offset: number; constructor( - connection: Connection, + connection: Dgraph, private accountID: string, private filters: IOperationsQueryParams, private first: number, @@ -89,6 +89,7 @@ export class OperationsQuery extends Query { } `; + console.log(query); return this.connection.query(query, { $first: this.first.toString(), $offset: this.offset.toString() diff --git a/src/storage/queries/query.ts b/src/storage/queries/query.ts index 7c745e7b..acbe2d21 100644 --- a/src/storage/queries/query.ts +++ b/src/storage/queries/query.ts @@ -1,9 +1,9 @@ -import { Connection } from "../connection"; +import { Dgraph } from "../dgraph"; export abstract class Query { - protected connection: Connection; + protected connection: Dgraph; - constructor(connection: Connection) { + constructor(connection: Dgraph) { this.connection = connection; } diff --git a/src/util/types.ts b/src/util/types.ts index afa041a3..4209af52 100644 --- a/src/util/types.ts +++ b/src/util/types.ts @@ -1,2 +1,9 @@ +import { DatabaseClient } from "../database"; +import { Dgraph } from "../storage/dgraph"; export type AccountID = string; export type AssetCode = string; + +export interface IApolloContext { + dgraph: Dgraph; + db: DatabaseClient; +} diff --git a/tests/integration/integration.test.ts b/tests/integration/integration.test.ts index cbc43abc..09b344b3 100644 --- a/tests/integration/integration.test.ts +++ b/tests/integration/integration.test.ts @@ -4,6 +4,7 @@ import { graphql } from "graphql"; import path from "path"; import { Client as dbClient } from "pg"; import { Network } from "stellar-base"; +import { db } from "../../src/database"; import schema from "../../src/schema"; import logger from "../../src/util/logger"; import * as secrets from "../../src/util/secrets"; @@ -58,7 +59,11 @@ describe("Integration tests", () => { const queryFile = caseName.toLowerCase().replace(/ /g, "_"); const query = fs.readFileSync(`${__dirname}/integration_queries/${queryFile}.gql`, "utf8"); - const { data } = await graphql(schema, query); + const { data } = await graphql({ + schema, + source: query, + contextValue: { db } + }); expect(data).toMatchSnapshot(); });