-
Notifications
You must be signed in to change notification settings - Fork 57
/
Copy pathincept.py
202 lines (164 loc) · 7.24 KB
/
incept.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# -*- encoding: utf-8 -*-
"""
keri.kli.commands module
"""
import argparse
from dataclasses import dataclass
from hio.base import doing
from keri import help
from keri.app import habbing, agenting, indirecting, configing, delegating, forwarding
from keri.app.cli.common import existing, incepting, config
from keri.core import coring
logger = help.ogler.getLogger()
parser = argparse.ArgumentParser(description='Initialize a prefix')
parser.set_defaults(handler=lambda args: handler(args))
parser.add_argument('--name', '-n', help='keystore name and file location of KERI keystore', required=True)
parser.add_argument('--base', '-b', help='additional optional prefix to file location of KERI keystore',
required=False, default="")
parser.add_argument('--alias', '-a', help='human readable alias for the new identifier prefix', required=True)
parser.add_argument("--config", "-c", help="directory override for configuration data")
parser.add_argument("--receipt-endpoint", help="Attempt to connect to witness receipt endpoint for witness receipts.",
dest="endpoint", action='store_true')
parser.add_argument("--proxy", help="alias for delegation communication proxy", default="")
parser.add_argument('--file', '-f', help='Filename to use to create the identifier', default="", required=False)
# Authentication for keystore
parser.add_argument('--passcode', '-p', help='21 character encryption passcode for keystore (is not saved)',
dest="bran", default=None) # passcode => bran
parser.add_argument('--aeid', help='qualified base64 of non-transferable identifier prefix for authentication '
'and encryption of secrets in keystore', default=None)
incepting.addInceptingArgs(parser)
@dataclass
class InceptOptions:
""" Options loaded from file parameter.
"""
transferable: bool | None
wits: list | None
icount: int | None
isith: int | str | list | None
ncount: int | None
nsith: int | str | list | None = '0'
toad: int = 0
delpre: str = None
estOnly: bool = False
data: list = None
def handler(args):
"""
Create KERI identifier prefix in specified key store with alias
Args:
args(Namespace): arguments object from command line
"""
name = args.name
base = args.base
bran = args.bran
alias = args.alias
config_dir = args.config
endpoint = args.endpoint
proxy = args.proxy
kwa = mergeArgsWithFile(args).__dict__
icpDoer = InceptDoer(name=name, base=base, alias=alias, bran=bran, endpoint=endpoint, proxy=proxy,
cnfg=config_dir, **kwa)
doers = [icpDoer]
return doers
def emptyOptions():
"""
Initializes an empty inception options to be used only when required values are passed in at the command line
"""
return InceptOptions(
transferable=None, wits=None, icount=None, isith=None, ncount=None, nsith=None
)
def mergeArgsWithFile(args):
"""
Merge options specified with command line arguments with any specified config file
with command line arguments taking precedence.
"""
required_args = ['transferable', 'wits', 'icount', 'isith', 'ncount', 'nsith', 'toad']
if args.file is None or args.file == '':
config.checkRequiredArgs(args, required_args)
incept_opts = config.loadFileOptions(args.file, InceptOptions) if args.file != '' else emptyOptions()
incept_opts.transferable = True if args.transferable else incept_opts.transferable
if len(args.wits) > 0:
incept_opts.wits = args.wits
if args.icount is not None:
incept_opts.icount = int(args.icount)
if args.toad is not None:
incept_opts.toad = int(args.toad)
if args.icount is not None:
incept_opts.icount = int(args.icount)
if args.isith is not None:
incept_opts.isith = args.isith
if args.ncount is not None:
incept_opts.ncount = int(args.ncount)
if args.nsith is not None:
incept_opts.nsith = args.nsith
if args.est_only is not None:
incept_opts.estOnly = args.est_only
if args.data is not None:
incept_opts.data = config.parseData(args.data)
if args.delpre is not None:
incept_opts.delpre = args.delpre
return incept_opts
class InceptDoer(doing.DoDoer):
""" DoDoer for creating a new identifier prefix and Hab with an alias.
"""
def __init__(self, name, base, alias, bran, endpoint, proxy=None, cnfg=None, **kwa):
cf = None
if config is not None:
cf = configing.Configer(name=name,
base="",
headDirPath=cnfg,
temp=False,
reopen=True,
clear=False)
self.endpoint = endpoint
self.hby = existing.setupHby(name=name, base=base, bran=bran, cf=cf)
self.proxy = self.hby.habByName(proxy) if proxy is not None else None
self.hbyDoer = habbing.HaberyDoer(habery=self.hby) # setup doer
self.swain = delegating.Anchorer(hby=self.hby, proxy=self.proxy)
self.postman = forwarding.Poster(hby=self.hby)
self.mbx = indirecting.MailboxDirector(hby=self.hby, topics=['/receipt', "/replay", "/reply"])
doers = [self.hbyDoer, self.postman, self.mbx, self.swain, doing.doify(self.inceptDo)]
self.inits = kwa
self.alias = alias
super(InceptDoer, self).__init__(doers=doers)
def inceptDo(self, tymth, tock=0.0, **kwa):
"""
Parameters:
tymth (function): injected function wrapper closure returned by .tymen() of
Tymist instance. Calling tymth() returns associated Tymist .tyme.
tock (float): injected initial tock value
Returns: doifiable Doist compatible generator method
"""
# enter context
self.wind(tymth)
self.tock = tock
_ = (yield self.tock)
hab = self.hby.makeHab(name=self.alias, **self.inits)
witDoer = agenting.WitnessReceiptor(hby=self.hby)
receiptor = agenting.Receiptor(hby=self.hby)
self.extend([witDoer, receiptor])
if hab.kever.delpre:
self.swain.delegation(pre=hab.pre, sn=0)
print("Waiting for delegation approval...")
while not self.swain.complete(hab.kever.prefixer, coring.Seqner(sn=hab.kever.sn)):
yield self.tock
elif hab.kever.wits:
print("Waiting for witness receipts...")
if self.endpoint:
yield from receiptor.receipt(hab.pre, sn=0)
else:
witDoer.msgs.append(dict(pre=hab.pre))
while not witDoer.cues:
_ = yield self.tock
if hab.kever.delpre:
if self.proxy is not None:
sender = self.proxy
else:
sender = hab
yield from self.postman.sendEventToDelegator(hab=hab, sender=sender, fn=hab.kever.sn)
print(f'Prefix {hab.pre}')
for idx, verfer in enumerate(hab.kever.verfers):
print(f'\tPublic key {idx + 1}: {verfer.qb64}')
print()
toRemove = [self.hbyDoer, witDoer, self.mbx, self.swain, self.postman, receiptor]
self.remove(toRemove)
return