From ad25efa47aee7076ae7f58bb6ab9c72b14e236ce Mon Sep 17 00:00:00 2001 From: Benny Date: Wed, 27 Mar 2024 16:17:12 +0100 Subject: [PATCH 1/3] Add xtdb-cli tool to Octopoes --- octopoes/tools/xtdb-cli.py | 258 +++++++++++++++++++++++++++++++++++++ 1 file changed, 258 insertions(+) create mode 100755 octopoes/tools/xtdb-cli.py diff --git a/octopoes/tools/xtdb-cli.py b/octopoes/tools/xtdb-cli.py new file mode 100755 index 00000000000..665fdb78826 --- /dev/null +++ b/octopoes/tools/xtdb-cli.py @@ -0,0 +1,258 @@ +#!/usr/bin/env python + +import argparse +import datetime +import sys +from pathlib import Path + +import httpx + + +class XTDB: + def __init__(self, host: str, port: int, node: str): + self.host = host + self.port = port + self.node = node + + def root(self, target: str = ""): + return f"http://{self.host}:{self.port}/_xtdb/{self.node}/{target}" + + def status(self): + headers = {"Accept": "application/json"} + req = httpx.get(self.root("status"), headers=headers) + return req.text + + def query(self, query: str = "{:query {:find [ ?var ] :where [[?var :xt/id ]]}}"): + headers = {"Accept": "application/json", "Content-Type": "application/edn"} + req = httpx.post(self.root("query"), headers=headers, data=query) + return req.text + + def entity(self, key: str): + headers = {"Accept": "application/json"} + req = httpx.get(self.root(f"entity?eid={key}"), headers=headers) + return req.text + + def history(self, key: str): + headers = {"Accept": "application/json"} + req = httpx.get(self.root(f"entity?eid={key}&history=true&sortOrder=asc"), headers=headers) + return req.text + + def entity_tx(self, key: str): + headers = {"Accept": "application/json"} + req = httpx.get(self.root(f"entity-tx?eid={key}"), headers=headers) + return req.text + + def attribute_stats(self): + headers = {"Accept": "application/json"} + req = httpx.get(self.root("attribute-stats"), headers=headers) + return req.text + + def sync(self, timeout: int = 500): + headers = {"Accept": "application/json"} + req = httpx.get(self.root(f"sync?timeout={timeout}"), headers=headers) + return req.text + + def await_tx(self, txid: int): + headers = {"Accept": "application/json"} + req = httpx.get(self.root(f"await-tx?txId={txid}"), headers=headers) + return req.text + + def await_tx_time(self, tm: str = datetime.datetime.now().isoformat()): + headers = {"Accept": "application/json"} + req = httpx.get(self.root(f"await-tx-time?tx-time={tm}"), headers=headers) + return req.text + + def tx_log(self): + headers = {"Accept": "application/json"} + req = httpx.get(self.root("tx-log"), headers=headers) + return req.text + + def submit_tx(self, txs): + headers = {"Accept": "application/json", "Content-Type": "application/json"} + data = '{{"tx-ops": {}}}'.format(" ".join(txs)) + req = httpx.post(self.root("submit-tx"), headers=headers, data=data) + return req.text + + def tx_committed(self, txid: int): + headers = {"Accept": "application/json"} + req = httpx.get(self.root(f"tx_commited?txId={txid}"), headers=headers) + return req.text + + def latest_completed_tx(self): + headers = {"Accept": "application/json"} + req = httpx.get(self.root("latest-completed-tx"), headers=headers) + return req.text + + def latest_submitted_tx(self): + headers = {"Accept": "application/json"} + req = httpx.get(self.root("latest-submitted-tx"), headers=headers) + return req.text + + def active_queries(self): + headers = {"Accept": "application/json"} + req = httpx.get(self.root("active-queries"), headers=headers) + return req.text + + def recent_queries(self): + headers = {"Accept": "application/json"} + req = httpx.get(self.root("recent-queries"), headers=headers) + return req.text + + def slowest_queries(self): + headers = {"Accept": "application/json"} + req = httpx.get(self.root("recent-queries"), headers=headers) + return req.text + + +def dispatch(xtdb, instruction): + match instruction.pop(0): + case "status": + return xtdb.status() + case "query": + if instruction: + return xtdb.query(instruction[0]) + else: + return xtdb.query() + case "list-keys": + return xtdb.query() + case "list-values": + return xtdb.query("{:query {:find [(pull ?var [*])] :where [[?var :xt/id]]}}") + case "entity": + if instruction: + return xtdb.entity(instruction[0]) + case "history": + if instruction: + return xtdb.entity(instruction[0]) + case "entity-tx": + if instruction: + return xtdb.entity(instruction[0]) + case "attribute-stats": + return xtdb.attribute_stats() + case "sync": + if instruction: + return xtdb.sync(instruction[0]) + else: + return xtdb.sync() + case "await-tx": + if instruction: + return xtdb.await_tx(instruction[0]) + case "await-tx-time": + if instruction: + return xtdb.await_tx_time(instruction[0]) + case "tx-log": + return xtdb.tx_log() + case "submit-tx": + if instruction: + return xtdb.submit_tx(instruction) + case "tx-committed": + if instruction: + return xtdb.tx_committed(instruction.pop(0)) + case "latest-completed-tx": + return xtdb.latest_completed_tx() + case "latest-submitted-tx": + return xtdb.latest_submitted_tx() + case "active-queries": + return xtdb.active_queries() + case "recent-queries": + return xtdb.recent_queries() + case "slowest-queries": + return xtdb.recent_queries() + + +KEYWORDS = [ + "status", + "query", + "list-keys", + "list-values", + "entity", + "history", + "entity-tx", + "attribute-stats", + "sync", + "await-tx", + "await-tx-time", + "tx-log", + "submit-tx", + "tx-committed", + "latest-completed-tx", + "latest-submitted-tx", + "active-queries", + "recent-queries", + "slowest-queries", +] + +EPILOG = """ +As instructions the following keywords with arguments are supported: + status + query [edn-query] + list-keys + list-values + entity [xt/id] + history [xt/id] + entity-tx [xt/id] + attribute-stats + sync [timeout in ms] + await-tx [transaction id] + await-tx-time [time] + tx-log + submit-tx [transaction list] + tx-committed [transaction id] + latest-completed-tx + latest-submitted-tx + active-queries + recent-queries + slowest-queries +See https://v1-docs.xtdb.com/clients/http/ for more information. + +OpenKAT https://openkat.nl/. +""" + + +def iparse(instructions): + idxs = [idx for idx, key in enumerate(instructions) if key in KEYWORDS] + [len(instructions)] + return [instructions[i:j] for i, j in zip(idxs, idxs[1:] + idxs[:1]) if instructions[i:j]] + + +def main(): + parser = argparse.ArgumentParser( + prog="xtdb-cli", + description="A command-line interface for xtdb multinode as used in OpenKAT", + epilog=EPILOG, + add_help=True, + allow_abbrev=True, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("--port", help="xtdb server port (default 3000)", type=int, default=3000) + parser.add_argument("--host", help="xtdb server hostname (default localhost)", type=str, default="localhost") + parser.add_argument("--node", help="xtdb node (default 0)", type=str, default="0") + parser.add_argument("instructions", type=str, nargs="*") + args = parser.parse_args() + xtdb = XTDB(args.host, args.port, args.node) + if args.instructions: + if args.instructions[0] in KEYWORDS: + for instruction in iparse(args.instructions): + result = dispatch(xtdb, instruction) + if result: + sys.stdout.write(f"{result}\n") + elif args.instructions[0] == "-": + for line in sys.stdin: + if line.rstrip() == "exit" or line.rstrip() == "quit": + break + for instruction in iparse(line.rstrip().split(" ")): + result = dispatch(xtdb, instruction) + if result: + sys.stdout.write(f"{result}\n") + else: + for fname in args.instructions: + with Path(fname).open("r") as file: + for line in file.readlines(): + if line.rstrip() == "exit" or line.rstrip() == "quit": + break + for instruction in iparse(line.rstrip().split(" ")): + result = dispatch(xtdb, instruction) + if result: + sys.stdout.write(f"{result}\n") + + +if __name__ == "__main__": + main() From 9efc251db6cd65359b7bb24febf57c6282479c66 Mon Sep 17 00:00:00 2001 From: Benny Date: Thu, 28 Mar 2024 12:19:15 +0100 Subject: [PATCH 2/3] Some introspection --- octopoes/tools/xtdb-cli.py | 114 +++++++++++-------------------------- 1 file changed, 32 insertions(+), 82 deletions(-) diff --git a/octopoes/tools/xtdb-cli.py b/octopoes/tools/xtdb-cli.py index 665fdb78826..c836fdf5702 100755 --- a/octopoes/tools/xtdb-cli.py +++ b/octopoes/tools/xtdb-cli.py @@ -14,172 +14,119 @@ def __init__(self, host: str, port: int, node: str): self.port = port self.node = node - def root(self, target: str = ""): + def _root(self, target: str = ""): return f"http://{self.host}:{self.port}/_xtdb/{self.node}/{target}" def status(self): headers = {"Accept": "application/json"} - req = httpx.get(self.root("status"), headers=headers) + req = httpx.get(self._root("status"), headers=headers) return req.text def query(self, query: str = "{:query {:find [ ?var ] :where [[?var :xt/id ]]}}"): headers = {"Accept": "application/json", "Content-Type": "application/edn"} - req = httpx.post(self.root("query"), headers=headers, data=query) + req = httpx.post(self._root("query"), headers=headers, data=query) return req.text def entity(self, key: str): headers = {"Accept": "application/json"} - req = httpx.get(self.root(f"entity?eid={key}"), headers=headers) + req = httpx.get(self._root(f"entity?eid={key}"), headers=headers) return req.text def history(self, key: str): headers = {"Accept": "application/json"} - req = httpx.get(self.root(f"entity?eid={key}&history=true&sortOrder=asc"), headers=headers) + req = httpx.get(self._root(f"entity?eid={key}&history=true&sortOrder=asc"), headers=headers) return req.text def entity_tx(self, key: str): headers = {"Accept": "application/json"} - req = httpx.get(self.root(f"entity-tx?eid={key}"), headers=headers) + req = httpx.get(self._root(f"entity-tx?eid={key}"), headers=headers) return req.text def attribute_stats(self): headers = {"Accept": "application/json"} - req = httpx.get(self.root("attribute-stats"), headers=headers) + req = httpx.get(self._root("attribute-stats"), headers=headers) return req.text def sync(self, timeout: int = 500): headers = {"Accept": "application/json"} - req = httpx.get(self.root(f"sync?timeout={timeout}"), headers=headers) + req = httpx.get(self._root(f"sync?timeout={timeout}"), headers=headers) return req.text def await_tx(self, txid: int): headers = {"Accept": "application/json"} - req = httpx.get(self.root(f"await-tx?txId={txid}"), headers=headers) + req = httpx.get(self._root(f"await-tx?txId={txid}"), headers=headers) return req.text def await_tx_time(self, tm: str = datetime.datetime.now().isoformat()): headers = {"Accept": "application/json"} - req = httpx.get(self.root(f"await-tx-time?tx-time={tm}"), headers=headers) + req = httpx.get(self._root(f"await-tx-time?tx-time={tm}"), headers=headers) return req.text def tx_log(self): headers = {"Accept": "application/json"} - req = httpx.get(self.root("tx-log"), headers=headers) + req = httpx.get(self._root("tx-log"), headers=headers) return req.text def submit_tx(self, txs): headers = {"Accept": "application/json", "Content-Type": "application/json"} data = '{{"tx-ops": {}}}'.format(" ".join(txs)) - req = httpx.post(self.root("submit-tx"), headers=headers, data=data) + req = httpx.post(self._root("submit-tx"), headers=headers, data=data) return req.text def tx_committed(self, txid: int): headers = {"Accept": "application/json"} - req = httpx.get(self.root(f"tx_commited?txId={txid}"), headers=headers) + req = httpx.get(self._root(f"tx_commited?txId={txid}"), headers=headers) return req.text def latest_completed_tx(self): headers = {"Accept": "application/json"} - req = httpx.get(self.root("latest-completed-tx"), headers=headers) + req = httpx.get(self._root("latest-completed-tx"), headers=headers) return req.text def latest_submitted_tx(self): headers = {"Accept": "application/json"} - req = httpx.get(self.root("latest-submitted-tx"), headers=headers) + req = httpx.get(self._root("latest-submitted-tx"), headers=headers) return req.text def active_queries(self): headers = {"Accept": "application/json"} - req = httpx.get(self.root("active-queries"), headers=headers) + req = httpx.get(self._root("active-queries"), headers=headers) return req.text def recent_queries(self): headers = {"Accept": "application/json"} - req = httpx.get(self.root("recent-queries"), headers=headers) + req = httpx.get(self._root("recent-queries"), headers=headers) return req.text def slowest_queries(self): headers = {"Accept": "application/json"} - req = httpx.get(self.root("recent-queries"), headers=headers) + req = httpx.get(self._root("recent-queries"), headers=headers) return req.text def dispatch(xtdb, instruction): match instruction.pop(0): - case "status": - return xtdb.status() - case "query": - if instruction: - return xtdb.query(instruction[0]) - else: - return xtdb.query() case "list-keys": return xtdb.query() case "list-values": return xtdb.query("{:query {:find [(pull ?var [*])] :where [[?var :xt/id]]}}") - case "entity": - if instruction: - return xtdb.entity(instruction[0]) - case "history": - if instruction: - return xtdb.entity(instruction[0]) - case "entity-tx": - if instruction: - return xtdb.entity(instruction[0]) - case "attribute-stats": - return xtdb.attribute_stats() - case "sync": - if instruction: - return xtdb.sync(instruction[0]) - else: - return xtdb.sync() - case "await-tx": - if instruction: - return xtdb.await_tx(instruction[0]) - case "await-tx-time": - if instruction: - return xtdb.await_tx_time(instruction[0]) - case "tx-log": - return xtdb.tx_log() case "submit-tx": if instruction: return xtdb.submit_tx(instruction) - case "tx-committed": - if instruction: - return xtdb.tx_committed(instruction.pop(0)) - case "latest-completed-tx": - return xtdb.latest_completed_tx() - case "latest-submitted-tx": - return xtdb.latest_submitted_tx() - case "active-queries": - return xtdb.active_queries() - case "recent-queries": - return xtdb.recent_queries() - case "slowest-queries": - return xtdb.recent_queries() + case x: + call = getattr(xtdb, x.replace("-", "_")) + match call.__code__.co_argcount - 1: + case 1: + return call(instruction[0]) + case 0: + return call() KEYWORDS = [ - "status", - "query", - "list-keys", - "list-values", - "entity", - "history", - "entity-tx", - "attribute-stats", - "sync", - "await-tx", - "await-tx-time", - "tx-log", - "submit-tx", - "tx-committed", - "latest-completed-tx", - "latest-submitted-tx", - "active-queries", - "recent-queries", - "slowest-queries", -] + keyword.replace("_", "-") + for keyword in dir(XTDB) + if callable(getattr(XTDB, keyword)) and not keyword.startswith("_") +] + ["list-keys", "list-values"] EPILOG = """ As instructions the following keywords with arguments are supported: @@ -202,6 +149,9 @@ def dispatch(xtdb, instruction): active-queries recent-queries slowest-queries + +If no keyword is given in the initial instruction either use a dash "-" to read stdin otherwise all instructions are treated as filenames + See https://v1-docs.xtdb.com/clients/http/ for more information. OpenKAT https://openkat.nl/. From 508582a9f14e9f85ada3171178c1804d02f83e3b Mon Sep 17 00:00:00 2001 From: Benny Date: Thu, 28 Mar 2024 12:25:38 +0100 Subject: [PATCH 3/3] Ruff: oh no this line was too long --- octopoes/tools/xtdb-cli.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/octopoes/tools/xtdb-cli.py b/octopoes/tools/xtdb-cli.py index c836fdf5702..de9a61dcd45 100755 --- a/octopoes/tools/xtdb-cli.py +++ b/octopoes/tools/xtdb-cli.py @@ -150,7 +150,9 @@ def dispatch(xtdb, instruction): recent-queries slowest-queries -If no keyword is given in the initial instruction either use a dash "-" to read stdin otherwise all instructions are treated as filenames +If no keyword is given in the initial instruction either use +* a dash "-" to read stdin +* otherwise all instructions are treated as filenames See https://v1-docs.xtdb.com/clients/http/ for more information.