12
12
#include <string.h>
13
13
14
14
struct nni_aio_expire_q {
15
- nni_mtx eq_mtx ;
16
- nni_cv eq_cv ;
17
- nni_aio * * eq_list ;
18
- uint32_t eq_len ;
19
- uint32_t eq_cap ;
20
- nni_aio * eq_aio ; // currently expiring (task dispatch)
21
- nni_thr eq_thr ;
22
- bool eq_exit ;
15
+ nni_mtx eq_mtx ;
16
+ nni_cv eq_cv ;
17
+ nni_list eq_list ;
18
+ uint32_t eq_len ;
19
+ nni_thr eq_thr ;
20
+ nni_time eq_next ; // next expiration
21
+ bool eq_exit ;
23
22
};
24
23
25
24
static nni_aio_expire_q * * nni_aio_expire_q_list ;
117
116
nni_aio_fini (nni_aio * aio )
118
117
{
119
118
nni_aio_cancel_fn fn ;
120
- void * arg ;
119
+ void * arg ;
121
120
nni_aio_expire_q * eq = aio -> a_expire_q ;
122
121
123
122
// This is like aio_close, but we don't want to dispatch
@@ -126,10 +125,10 @@ nni_aio_fini(nni_aio *aio)
126
125
// We also wait if the aio is being expired.
127
126
nni_mtx_lock (& eq -> eq_mtx );
128
127
aio -> a_stop = true;
129
- nni_aio_expire_rm (aio );
130
- while (eq -> eq_aio == aio ) {
128
+ while (aio -> a_expiring ) {
131
129
nni_cv_wait (& eq -> eq_cv );
132
130
}
131
+ nni_aio_expire_rm (aio );
133
132
fn = aio -> a_cancel_fn ;
134
133
arg = aio -> a_cancel_arg ;
135
134
aio -> a_cancel_fn = NULL ;
@@ -203,7 +202,7 @@ nni_aio_stop(nni_aio *aio)
203
202
{
204
203
if (aio != NULL ) {
205
204
nni_aio_cancel_fn fn ;
206
- void * arg ;
205
+ void * arg ;
207
206
nni_aio_expire_q * eq = aio -> a_expire_q ;
208
207
209
208
nni_mtx_lock (& eq -> eq_mtx );
@@ -228,7 +227,7 @@ nni_aio_close(nni_aio *aio)
228
227
{
229
228
if (aio != NULL ) {
230
229
nni_aio_cancel_fn fn ;
231
- void * arg ;
230
+ void * arg ;
232
231
nni_aio_expire_q * eq = aio -> a_expire_q ;
233
232
234
233
nni_mtx_lock (& eq -> eq_mtx );
407
406
nni_aio_abort (nni_aio * aio , int rv )
408
407
{
409
408
nni_aio_cancel_fn fn ;
410
- void * arg ;
409
+ void * arg ;
411
410
nni_aio_expire_q * eq = aio -> a_expire_q ;
412
411
413
412
nni_mtx_lock (& eq -> eq_mtx );
@@ -508,125 +507,114 @@ static void
508
507
nni_aio_expire_add (nni_aio * aio )
509
508
{
510
509
nni_aio_expire_q * eq = aio -> a_expire_q ;
511
- if (eq -> eq_len >= eq -> eq_cap ) {
512
- nni_aio * * new_list =
513
- nni_zalloc (eq -> eq_cap * 2 * sizeof (nni_aio * ));
514
- for (uint32_t i = 0 ; i < eq -> eq_len ; i ++ ) {
515
- new_list [i ] = eq -> eq_list [i ];
516
- }
517
- nni_free (eq -> eq_list , eq -> eq_cap * sizeof (nni_aio * ));
518
- eq -> eq_list = new_list ;
519
- eq -> eq_cap *= 2 ;
520
- }
521
510
522
- eq -> eq_list [eq -> eq_len ++ ] = aio ;
523
- // Fire the latest aio, but it cames with performance punishment
524
- nni_cv_wake (& eq -> eq_cv );
511
+ nni_list_append (& eq -> eq_list , aio );
512
+
513
+ if (eq -> eq_next > aio -> a_expire ) {
514
+ eq -> eq_next = aio -> a_expire ;
515
+ nni_cv_wake (& eq -> eq_cv );
516
+ }
525
517
}
526
518
527
519
static void
528
520
nni_aio_expire_rm (nni_aio * aio )
529
521
{
530
- nni_aio_expire_q * eq = aio -> a_expire_q ;
531
-
532
- for (uint32_t i = 0 ; i < eq -> eq_len ; i ++ ) {
533
- if (aio == eq -> eq_list [i ]) {
534
- eq -> eq_list [i ] = eq -> eq_list [eq -> eq_len - 1 ];
535
- eq -> eq_len -- ;
536
- break ;
537
- }
538
- }
522
+ nni_list_node_remove (& aio -> a_expire_node );
539
523
540
- if (eq -> eq_len < eq -> eq_cap / 4 && eq -> eq_cap > NNI_EXPIRE_Q_SIZE ) {
541
- nni_aio * * new_list =
542
- nni_zalloc (eq -> eq_cap * sizeof (nni_aio * ) / 4 );
543
- for (uint32_t i = 0 ; i < eq -> eq_len ; i ++ ) {
544
- new_list [i ] = eq -> eq_list [i ];
545
- }
546
- nni_free (eq -> eq_list , eq -> eq_cap * sizeof (nni_aio * ));
547
- eq -> eq_list = new_list ;
548
- eq -> eq_cap /= 4 ;
549
- }
524
+ // If this item is the one that is going to wake the loop,
525
+ // don't worry about it. It will wake up normally, or when we
526
+ // add a new aio to it. Worst case is just one spurious wake up,
527
+ // which we'd need to do anyway.
550
528
}
551
529
552
530
static void
553
531
nni_aio_expire_loop (void * arg )
554
532
{
555
533
nni_aio_expire_q * q = arg ;
556
- nni_mtx * mtx = & q -> eq_mtx ;
557
- nni_cv * cv = & q -> eq_cv ;
558
- nni_aio * * list ;
534
+ nni_mtx * mtx = & q -> eq_mtx ;
535
+ nni_cv * cv = & q -> eq_cv ;
559
536
nni_time now ;
560
- uint32_t aio_idx ;
537
+ uint32_t exp_idx ;
538
+ nni_aio * expires [NNI_EXPIRE_BATCH ];
561
539
562
540
nni_thr_set_name (NULL , "nng:aio:expire" );
563
541
564
- now = nni_clock ();
565
542
nni_mtx_lock (mtx );
566
543
567
544
for (;;) {
568
545
nni_aio * aio ;
569
546
int rv ;
570
-
571
- if (q -> eq_len == 0 ) {
572
-
573
- if (q -> eq_exit ) {
574
- nni_mtx_unlock (mtx );
575
- return ;
576
- }
577
-
578
- nni_cv_wait (cv );
579
-
580
- now = nni_clock ();
547
+ nni_time next ;
548
+
549
+ next = q -> eq_next ;
550
+ now = nni_clock ();
551
+
552
+ // Each time we wake up, we scan the entire list of elements.
553
+ // We scan forward, moving up to NNI_EXPIRE_Q_SIZE elements
554
+ // (a batch) to a saved array of things we are going to cancel.
555
+ // This mostly runs in O(n), provided you don't have many
556
+ // elements (> NNI_EXPIRE_Q_SIZE) all expiring simultaneously.
557
+ aio = nni_list_first (& q -> eq_list );
558
+ if ((aio == NULL ) && (q -> eq_exit )) {
559
+ nni_mtx_unlock (mtx );
560
+ return ;
561
+ }
562
+ if (now < next ) {
563
+ // Early wake up (just to reschedule), no need to
564
+ // rescan the list. This is an optimization.
565
+ nni_cv_until (cv , next );
581
566
continue ;
582
567
}
583
-
584
- // Find the timer with min expire time.
585
- list = q -> eq_list ;
586
- aio_idx = 0 ;
587
- aio = list [aio_idx ];
588
- for (uint32_t i = 0 ; i < q -> eq_len ; i ++ ) {
589
- if (list [i ]-> a_expire < aio -> a_expire ) {
590
- aio = list [i ];
591
- aio_idx = i ;
568
+ q -> eq_next = NNI_TIME_NEVER ;
569
+ exp_idx = 0 ;
570
+ while (aio != NULL ) {
571
+ if ((aio -> a_expire < now ) &&
572
+ (exp_idx < NNI_EXPIRE_BATCH )) {
573
+ nni_aio * nxt ;
574
+
575
+ // This one is expiring.
576
+ expires [exp_idx ++ ] = aio ;
577
+ // save the next node
578
+ nxt = nni_list_next (& q -> eq_list , aio );
579
+ nni_list_remove (& q -> eq_list , aio );
580
+ // Place a temporary hold on the aio.
581
+ // This prevents it from being destroyed.
582
+ aio -> a_expiring = true;
583
+ aio = nxt ;
584
+ continue ;
592
585
}
593
- }
594
- if (now < aio -> a_expire ) {
595
- // Unexpired; we just wait for the next expired aio.
596
- nni_cv_until (cv , aio -> a_expire );
597
- now = nni_clock ();
598
- continue ;
586
+ if (aio -> a_expire < q -> eq_next ) {
587
+ q -> eq_next = aio -> a_expire ;
588
+ }
589
+ aio = nni_list_next (& q -> eq_list , aio );
599
590
}
600
591
601
- // The time has come for this aio. Expire it, canceling any
602
- // outstanding I/O.
603
- list [aio_idx ] = list [q -> eq_len - 1 ];
604
- q -> eq_len -- ;
605
- rv = aio -> a_expire_ok ? 0 : NNG_ETIMEDOUT ;
592
+ for (uint32_t i = 0 ; i < exp_idx ; i ++ ) {
593
+ aio = expires [i ];
594
+ rv = aio -> a_expire_ok ? 0 : NNG_ETIMEDOUT ;
606
595
607
- nni_aio_cancel_fn cancel_fn = aio -> a_cancel_fn ;
608
- void * cancel_arg = aio -> a_cancel_arg ;
596
+ nni_aio_cancel_fn cancel_fn = aio -> a_cancel_fn ;
597
+ void * cancel_arg = aio -> a_cancel_arg ;
609
598
610
- aio -> a_cancel_fn = NULL ;
611
- aio -> a_cancel_arg = NULL ;
612
- // Place a temporary hold on the aio. This prevents it
613
- // from being destroyed.
614
- q -> eq_aio = aio ;
599
+ aio -> a_cancel_fn = NULL ;
600
+ aio -> a_cancel_arg = NULL ;
615
601
616
- // We let the cancel function handle the completion.
617
- // If there is no cancellation function, then we cannot
618
- // terminate the aio - we've tried, but it has to run
619
- // to it's natural conclusion.
620
- nni_mtx_unlock (mtx );
621
- cancel_fn (aio , cancel_arg , rv );
622
-
623
- // Get updated time before reacquiring lock.
624
- now = nni_clock ();
625
-
626
- nni_mtx_lock (mtx );
627
-
628
- q -> eq_aio = NULL ;
602
+ // We let the cancel function handle the completion.
603
+ // If there is no cancellation function, then we cannot
604
+ // terminate the aio - we've tried, but it has to run
605
+ // to its natural conclusion.
606
+ if (cancel_fn != NULL ) {
607
+ nni_mtx_unlock (mtx );
608
+ cancel_fn (aio , cancel_arg , rv );
609
+ nni_mtx_lock (mtx );
610
+ }
611
+ aio -> a_expiring = false;
612
+ }
629
613
nni_cv_wake (cv );
614
+
615
+ if (now < q -> eq_next ) {
616
+ nni_cv_until (cv , q -> eq_next );
617
+ }
630
618
}
631
619
}
632
620
@@ -756,7 +744,6 @@ nni_aio_expire_q_free(nni_aio_expire_q *eq)
756
744
nni_mtx_unlock (& eq -> eq_mtx );
757
745
}
758
746
759
- nni_free (eq -> eq_list , eq -> eq_cap * sizeof (nni_aio * ));
760
747
nni_thr_fini (& eq -> eq_thr );
761
748
nni_cv_fini (& eq -> eq_cv );
762
749
nni_mtx_fini (& eq -> eq_mtx );
@@ -773,9 +760,8 @@ nni_aio_expire_q_alloc(void)
773
760
}
774
761
nni_mtx_init (& eq -> eq_mtx );
775
762
nni_cv_init (& eq -> eq_cv , & eq -> eq_mtx );
776
- eq -> eq_cap = NNI_EXPIRE_Q_SIZE ;
777
- eq -> eq_len = 0 ;
778
- eq -> eq_list = nni_zalloc (eq -> eq_cap * sizeof (nni_aio * ));
763
+ NNI_LIST_INIT (& eq -> eq_list , nni_aio , a_expire_node );
764
+ eq -> eq_next = NNI_TIME_NEVER ;
779
765
eq -> eq_exit = false;
780
766
781
767
if (nni_thr_init (& eq -> eq_thr , nni_aio_expire_loop , eq ) != 0 ) {
0 commit comments