Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blockstream test tool #949

Merged
merged 7 commits into from
Feb 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions packages/augur-tools/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# augur-tools

## blockstream-console

Blockstream console is a simple utility which spins up blockstream, configures it through either [ethers](https://github.com/ethers-io/ethers.js/) or [ethrpc](https://github.com/ethereumjs/ethrpc), and emits the received block and log notifications directly to the console
Event callbacks are emitted via stdout, debug logs via stderr.
### Usage:
```angular2
yarn blockstream-console
Querying block: latest
[ethrpc] eth_subscribe request failed, fall back to polling for blocks: Method not found
Finished querying block: latest
Querying block: 0x6d44a0
Finished querying block: 0x6d44a0
Querying logs: {"blockHash":"0x476cb3978411be71f8b613c71921ae63163a4eae9a1e561163a330b35d149945","address":"0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"}
Finished querying logs {"blockHash":"0x476cb3978411be71f8b613c71921ae63163a4eae9a1e561163a330b35d149945","address":"0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"} (0)
BLOCK Added 7160992 0x476cb3978411be71f8b613c71921ae63163a4eae9a1e561163a330b35d149945
Querying logs: {"blockHash":"0x98df0e5b5cfd14dbc55ecae6e14217357beb98dabaf5285229a52c35484e0157","address":"0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"}
Finished querying logs {"blockHash":"0x98df0e5b5cfd14dbc55ecae6e14217357beb98dabaf5285229a52c35484e0157","address":"0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"} (0)
Querying logs: {"blockHash":"0x0b445bea8a03f73ac3266282a9df23945e0fb50603e1668c639165f0727aefc2","address":"0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"}
Finished querying logs {"blockHash":"0x0b445bea8a03f73ac3266282a9df23945e0fb50603e1668c639165f0727aefc2","address":"0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"} (0)
Querying logs: {"blockHash":"0x9fd2165541af79ee3ed201aabbf3b8f552d84ac47c80803e527f38819913c42c","address":"0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"}

```
or without debug info
```
yarn blockstream-console 2> /dev/null
[ethrpc] eth_subscribe request failed, fall back to polling for blocks: Method not found
BLOCK Added 7161001 0xb623dbadef00bdf42eb680498b3f8fd5be2ac01e5db3fee2127e5159b302625b
BLOCK Added 7161002 0xbaf1d0dfb5e87f1f3f15a79dbc3fa706ff305c3d13b6e6225272b786e1fda157
┗ Added LOGS (1) [14] to block 0xbaf1d0dfb5e87f1f3f15a79dbc3fa706ff305c3d13b6e6225272b786e1fda157

BLOCK Added 7161003 0x71b6995194f41aa2d62af227c5bb7113c3159a924755ba0fbb12890d27163630
BLOCK Added 7161004 0x539f771644f07b9ce89518bf07277be6fdc62b7c20fc054f6405ba83a37854b8
┗ Added LOGS (1) [46] to block 0x539f771644f07b9ce89518bf07277be6fdc62b7c20fc054f6405ba83a37854b8
```

or against ethers instead of ethrpc (default):
```
$ ADAPTER_TYPE=ethers yarn blockstream-console 2> /dev/null
BLOCK Added 7161001 0xb623dbadef00bdf42eb680498b3f8fd5be2ac01e5db3fee2127e5159b302625b
BLOCK Added 7161002 0xbaf1d0dfb5e87f1f3f15a79dbc3fa706ff305c3d13b6e6225272b786e1fda157
BLOCK Added 7161003 0x71b6995194f41aa2d62af227c5bb7113c3159a924755ba0fbb12890d27163630
BLOCK Added 7161004 0x539f771644f07b9ce89518bf07277be6fdc62b7c20fc054f6405ba83a37854b8
BLOCK Added 7161005 0x57d5e9e08bced4d5bff3dc3f83a428bb8480320d805544804d25739b71124306
BLOCK Added 7161006 0x0ae9453a8299d3a55048e94d89eefe5374236ebb9c24763e7b6fb1f817052a19
BLOCK Added 7161007 0xf07a998636b607d653798859b93d46a679968405650c6019a8baa00e60f80968
┗ Added LOGS (3) [88,128,131] to block 0xf07a998636b607d653798859b93d46a679968405650c6019a8baa00e60f80968

BLOCK Added 7161008 0x4a4cb87bad063b428ef0853bda7dee7dc9b00f21e82daa97ef40e30f6f414a6c

```
15 changes: 12 additions & 3 deletions packages/augur-tools/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
"docker:geth:pop-normal-time": "docker-builder/run.sh augurproject/dev-pop-normtime-geth-v2",
"docker:geth:attach": "docker run --rm --net host -it ethereum/client-go attach rpc:http://127.0.0.1:8545",
"docker:geth:files": "./scripts/copy-docker-files.sh augurproject/dev-pop-geth:core-$(node scripts/core-version.js)",
"flash": "node ./scripts/flash"
"flash": "node ./scripts/flash",
"blockstream-console": "ts-node ./scripts/blockstream-console.ts"
},
"repository": {
"type": "git",
Expand All @@ -38,8 +39,16 @@
"url": "https://github.com/augurproject/augur/issues"
},
"homepage": "https://github.com/augurproject/augur#readme",
"dependencies": {},
"dependencies": {
"options-parser": "0.4.0",
"ethereumjs-blockstream": "7.0.0",
"ethers": "4.0.23",
"ethrpc": "6.1.3",
"lodash": "4.17.11"
},
"devDependencies": {
"options-parser": "0.4.0"
"options-parser": "0.4.0",
"ts-node": "7.0.1",
"typescript": "3.2.1"
}
}
47 changes: 47 additions & 0 deletions packages/augur-tools/scripts/blockstream-adapters/ethers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { Block, Log, FilterOptions } from "ethereumjs-blockstream";
import * as _ from "lodash";
import { ethers } from "ethers";
import { Dependencies } from "./index";

function convertEthersBlockToBlockstreamBlock(block: ethers.providers.Block): Block {
return {
number: "0x" + block.number.toString(16),
hash: block.hash,
parentHash: block.parentHash,
}
}

function convertEthersLogToBlockstreamLog(log: ethers.providers.Log): Log {
return {
logIndex: (log.logIndex || "0").toString(),
blockNumber: (log.blockNumber || "0").toString(),
blockHash: log.blockHash || "0",
}
}

async function getBlockByHashOrTag(provider: ethers.providers.Provider, hashOrTag: string): Promise<Block> {
console.error("Querying block: " + hashOrTag)
const block = await provider.getBlock(hashOrTag, false);
console.error("Finished querying block: " + hashOrTag)
return convertEthersBlockToBlockstreamBlock(block);
}

async function getLogs(provider: ethers.providers.Provider, filterOptions: FilterOptions): Promise<Log[]> {
console.error("Querying logs " + JSON.stringify(filterOptions));
const logs = await provider.getLogs({
...filterOptions,
topics: _.compact(filterOptions.topics),
});
console.error(`Finished querying logs ${JSON.stringify(filterOptions)} (${logs.length})`);

return _.map(logs, convertEthersLogToBlockstreamLog);
}

export default async function connect(httpAddress: string): Promise<Dependencies> {
const provider = new ethers.providers.JsonRpcProvider(httpAddress)
return {
getBlockByNumber: _.partial(getBlockByHashOrTag, provider),
getBlockByHash: _.partial(getBlockByHashOrTag, provider),
getLogs: _.partial(getLogs, provider),
}
}
54 changes: 54 additions & 0 deletions packages/augur-tools/scripts/blockstream-adapters/ethrpc.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { Block, Log, FilterOptions } from "ethereumjs-blockstream";
import { Dependencies } from "./index";
const ethrpc = require("ethrpc");

function getBlockByHash(hash: string): Promise<Block> {
return new Promise<Block>(function (resolve, reject) {
ethrpc.getBlockByHash(hash, false, function (err: Error | null, block: Block | undefined) {
if (err) return reject(err);
resolve(block);
});
});
}

function getBlockByNumber(number: string): Promise<Block> {
return new Promise<Block>(function (resolve, reject) {
console.error("Querying block: " + number)
ethrpc.getBlockByNumber(number, false, function (err: Error | null, block: Block | undefined) {
if (err) return reject(err);
console.error("Finished querying block: " + number);
resolve(block);
});
});
}

function getLogs(filterOptions: FilterOptions): Promise<Log[]> {
return new Promise((resolve, reject) => {
console.error("Querying logs: " + JSON.stringify(filterOptions));
ethrpc.getLogs(filterOptions, (err: Error|null, logs: Log[]|undefined) => {
if (err) return reject(err);
if (logs === undefined) return reject(new Error("Logs undefined for filter " + JSON.stringify(filterOptions)));
console.error(`Finished querying logs ${JSON.stringify(filterOptions)} (${logs.length})`);
resolve(logs);
});
});
}

export default function connect(httpAddress: string): Promise<Dependencies> {
return new Promise<Dependencies>((resolve, reject) => {
const configuration = {
httpAddresses: [httpAddress],
wsAddresses: [],
ipcAddresses: [],
};
ethrpc.connect(
configuration, function (err: Error | undefined) {
if (err) return reject(err);
resolve({
getBlockByNumber,
getBlockByHash,
getLogs,
});
});
});
}
20 changes: 20 additions & 0 deletions packages/augur-tools/scripts/blockstream-adapters/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import ethrpc from "./ethrpc";
import ethers from "./ethers";
export { ethrpc, ethers };
import { Block, FilterOptions, Log } from "ethereumjs-blockstream";

export type GetBlockByString = (hash: string) => Promise<Block | null>

export interface Dependencies {
getBlockByNumber: GetBlockByString,
getBlockByHash: GetBlockByString,
getLogs: (filterOptions: FilterOptions) => Promise<Log[]>,
}

export type SUPPORTED_ADAPTER = "ethrpc" | "ethers";
export const SUPPORTED_ADAPTER = ["ethrpc", "ethers"];

export function isSupportedAdapter(adapterName: string): adapterName is SUPPORTED_ADAPTER {
if (SUPPORTED_ADAPTER.includes(adapterName)) return true;
return false;
}
76 changes: 76 additions & 0 deletions packages/augur-tools/scripts/blockstream-console.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { Block, BlockAndLogStreamer, FilterOptions, Log } from "ethereumjs-blockstream";
import * as adapters from "./blockstream-adapters";
import { GetBlockByString, isSupportedAdapter, SUPPORTED_ADAPTER } from "./blockstream-adapters";

const POLLING_FREQUENCY = parseInt(process.env.POLLING_FREQUENCY || "3000");;
const STARTUP_BLOCKS = parseInt(process.env.STARTUP_BLOCKS || "5");
const ETHEREUM_HTTP = process.env.ETHEREUM_HTTP || "http://127.0.0.1:8545";
const ADAPTER_TYPE = process.env.ADAPTER_TYPE || "ethrpc";
const LOG_FILTER = {
address: process.env.FILTER_ADDRESS || "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
};


function startPollingForBlocks(blockstream: BlockAndLogStreamer<Block, Log>, getBlockByNumber: GetBlockByString) {
setInterval(async function () {
let block = await getBlockByNumber("latest");
if (block === null) return console.warn("bad block");
blockstream.reconcileNewBlock(block);
}, POLLING_FREQUENCY);
}

function describeLogs(blockHash: string, logs: Log[]) {
return `(${logs.length}) [${logs.map(log => parseInt(log.logIndex, 16))}] to block ${blockHash}`;
}

function describeBlock(block: Block) {
return `${parseInt(block.number, 16)} ${block.hash}`;
}

function setupLogging(blockstream: BlockAndLogStreamer<Block, Log>) {
blockstream.addLogFilter(LOG_FILTER)

blockstream.subscribeToOnBlockAdded((block: Block) => {
console.log("BLOCK Added " + describeBlock(block))
});
blockstream.subscribeToOnBlockRemoved((block: Block) => {
console.log("BLOCK Removed " + describeBlock(block))
});
blockstream.subscribeToOnLogsAdded((blockHash, logs) => {
if (logs.length > 0) {
console.log(" ┗ Added LOGS " + describeLogs(blockHash, logs) + "\n");
}
});
blockstream.subscribeToOnLogsRemoved((blockHash, logs) => {
if (logs.length > 0) {
console.log("\n ┏ Removed LOGS " + describeLogs(blockHash, logs));
}
});
}

function getBlockBehind(blockNumber: string, howManyBlocks: number) {
return "0x" + (parseInt(blockNumber, 16) - howManyBlocks).toString(16);
}

async function doStuff(adapterType: SUPPORTED_ADAPTER) {
const dependencies = await adapters[adapterType](ETHEREUM_HTTP);
const blockstream = new BlockAndLogStreamer(dependencies.getBlockByHash, dependencies.getLogs, console.warn);

const block = await dependencies.getBlockByNumber("latest");
if (block === null) throw new Error("Could not get latest block");

const fromBlockNumber = getBlockBehind(block.number, STARTUP_BLOCKS);
const fromBlock = await dependencies.getBlockByNumber(fromBlockNumber);
if (fromBlock === null) throw new Error("Could not get block " + fromBlockNumber);

setupLogging(blockstream);
await blockstream.reconcileNewBlock(fromBlock);
await blockstream.reconcileNewBlock(block);
startPollingForBlocks(blockstream, dependencies.getBlockByNumber);
}

if (isSupportedAdapter(ADAPTER_TYPE)) {
doStuff(ADAPTER_TYPE);
} else {
console.error("Use supported ADAPTER_TYPE: " + SUPPORTED_ADAPTER)
}
Loading