1
1
use anyhow:: Result ;
2
+ use futures:: { Future , TryFutureExt } ;
3
+ use sqlx:: { AnyConnection , Connection } ;
4
+ use std:: io;
5
+ use std:: time:: Duration ;
2
6
3
- use crate :: opt:: { Command , DatabaseCommand , MigrateCommand } ;
7
+ use crate :: opt:: { Command , ConnectOpts , DatabaseCommand , MigrateCommand } ;
4
8
5
9
mod database;
6
10
// mod migration;
@@ -23,11 +27,11 @@ pub async fn run(opt: Opt) -> Result<()> {
23
27
source,
24
28
dry_run,
25
29
ignore_missing,
26
- database_url ,
30
+ connect_opts ,
27
31
} => {
28
32
migrate:: run (
29
33
source. resolve ( & migrate. source ) ,
30
- & database_url ,
34
+ & connect_opts ,
31
35
dry_run,
32
36
* ignore_missing,
33
37
)
@@ -37,56 +41,98 @@ pub async fn run(opt: Opt) -> Result<()> {
37
41
source,
38
42
dry_run,
39
43
ignore_missing,
40
- database_url ,
44
+ connect_opts ,
41
45
} => {
42
46
migrate:: revert (
43
47
source. resolve ( & migrate. source ) ,
44
- & database_url ,
48
+ & connect_opts ,
45
49
dry_run,
46
50
* ignore_missing,
47
51
)
48
52
. await ?
49
53
}
50
54
MigrateCommand :: Info {
51
55
source,
52
- database_url ,
53
- } => migrate:: info ( source. resolve ( & migrate. source ) , & database_url ) . await ?,
56
+ connect_opts ,
57
+ } => migrate:: info ( source. resolve ( & migrate. source ) , & connect_opts ) . await ?,
54
58
MigrateCommand :: BuildScript { source, force } => {
55
59
migrate:: build_script ( source. resolve ( & migrate. source ) , force) ?
56
60
}
57
61
} ,
58
62
59
63
Command :: Database ( database) => match database. command {
60
- DatabaseCommand :: Create { database_url } => database:: create ( & database_url ) . await ?,
64
+ DatabaseCommand :: Create { connect_opts } => database:: create ( & connect_opts ) . await ?,
61
65
DatabaseCommand :: Drop {
62
66
confirmation,
63
- database_url ,
64
- } => database:: drop ( & database_url , !confirmation) . await ?,
67
+ connect_opts ,
68
+ } => database:: drop ( & connect_opts , !confirmation. yes ) . await ?,
65
69
DatabaseCommand :: Reset {
66
70
confirmation,
67
71
source,
68
- database_url ,
69
- } => database:: reset ( & source, & database_url , !confirmation) . await ?,
72
+ connect_opts ,
73
+ } => database:: reset ( & source, & connect_opts , !confirmation. yes ) . await ?,
70
74
DatabaseCommand :: Setup {
71
75
source,
72
- database_url ,
73
- } => database:: setup ( & source, & database_url ) . await ?,
76
+ connect_opts ,
77
+ } => database:: setup ( & source, & connect_opts ) . await ?,
74
78
} ,
75
79
76
80
Command :: Prepare {
77
81
check : false ,
78
82
merged,
79
83
args,
80
- database_url ,
81
- } => prepare:: run ( & database_url , merged, args) ?,
84
+ connect_opts ,
85
+ } => prepare:: run ( & connect_opts , merged, args) . await ?,
82
86
83
87
Command :: Prepare {
84
88
check : true ,
85
89
merged,
86
90
args,
87
- database_url ,
88
- } => prepare:: check ( & database_url , merged, args) ?,
91
+ connect_opts ,
92
+ } => prepare:: check ( & connect_opts , merged, args) . await ?,
89
93
} ;
90
94
91
95
Ok ( ( ) )
92
96
}
97
+
98
+ /// Attempt to connect to the database server, retrying up to `ops.connect_timeout`.
99
+ async fn connect ( opts : & ConnectOpts ) -> sqlx:: Result < AnyConnection > {
100
+ retry_connect_errors ( opts, AnyConnection :: connect) . await
101
+ }
102
+
103
+ /// Attempt an operation that may return errors like `ConnectionRefused`,
104
+ /// retrying up until `ops.connect_timeout`.
105
+ ///
106
+ /// The closure is passed `&ops.database_url` for easy composition.
107
+ async fn retry_connect_errors < ' a , F , Fut , T > (
108
+ opts : & ' a ConnectOpts ,
109
+ mut connect : F ,
110
+ ) -> sqlx:: Result < T >
111
+ where
112
+ F : FnMut ( & ' a str ) -> Fut ,
113
+ Fut : Future < Output = sqlx:: Result < T > > + ' a ,
114
+ {
115
+ backoff:: future:: retry (
116
+ backoff:: ExponentialBackoffBuilder :: new ( )
117
+ . with_max_elapsed_time ( Some ( Duration :: from_secs ( opts. connect_timeout ) ) )
118
+ . build ( ) ,
119
+ || {
120
+ connect ( & opts. database_url ) . map_err ( |e| -> backoff:: Error < sqlx:: Error > {
121
+ match e {
122
+ sqlx:: Error :: Io ( ref ioe) => match ioe. kind ( ) {
123
+ io:: ErrorKind :: ConnectionRefused
124
+ | io:: ErrorKind :: ConnectionReset
125
+ | io:: ErrorKind :: ConnectionAborted => {
126
+ return backoff:: Error :: transient ( e) ;
127
+ }
128
+ _ => ( ) ,
129
+ } ,
130
+ _ => ( ) ,
131
+ }
132
+
133
+ backoff:: Error :: permanent ( e)
134
+ } )
135
+ } ,
136
+ )
137
+ . await
138
+ }
0 commit comments