@@ -264,7 +264,11 @@ void testRecoverFromJMFailover() throws Exception {
264
264
}
265
265
266
266
waitUntilWriteExecutionVertexFinishedEventPersisted (5 );
267
- runInMainThread (() -> jobEventStore .stop (false ));
267
+ runInMainThread (
268
+ () -> {
269
+ scheduler .cancel ();
270
+ jobEventStore .stop (false );
271
+ });
268
272
269
273
// register all produced partitions
270
274
registerPartitions (scheduler );
@@ -360,7 +364,11 @@ void testJobVertexUnFinishedAndOperatorCoordinatorNotSupportBatchSnapshot() thro
360
364
getCurrentAttemptIds (scheduler .getExecutionJobVertex (MIDDLE_ID ));
361
365
362
366
waitUntilWriteExecutionVertexFinishedEventPersisted (6 );
363
- runInMainThread (() -> jobEventStore .stop (false ));
367
+ runInMainThread (
368
+ () -> {
369
+ scheduler .cancel ();
370
+ jobEventStore .stop (false );
371
+ });
364
372
365
373
// register partitions, the partition of source task 0 is lost, and it will be restarted
366
374
// if middle task 0 need be restarted.
@@ -462,6 +470,7 @@ void testJobVertexFinishedAndOperatorCoordinatorNotSupportBatchSnapshotAndPartit
462
470
waitUntilWriteExecutionVertexFinishedEventPersisted (5 );
463
471
runInMainThread (
464
472
() -> {
473
+ scheduler .cancel ();
465
474
jobEventStore .stop (false );
466
475
});
467
476
@@ -548,7 +557,11 @@ void testRecoverFromJMFailoverAndPartitionsUnavailable() throws Exception {
548
557
getCurrentAttemptIds (scheduler .getExecutionJobVertex (SOURCE_ID ));
549
558
550
559
waitUntilWriteExecutionVertexFinishedEventPersisted (5 );
551
- runInMainThread (() -> jobEventStore .stop (false ));
560
+ runInMainThread (
561
+ () -> {
562
+ scheduler .cancel ();
563
+ jobEventStore .stop (false );
564
+ });
552
565
553
566
int losePartitionsTaskIndex = 0 ;
554
567
@@ -619,7 +632,11 @@ void testRecoverDecidedParallelismFromTheSameJobGraphInstance() throws Exception
619
632
getCurrentAttemptIds (scheduler .getExecutionJobVertex (SINK_ID ));
620
633
621
634
waitUntilWriteExecutionVertexFinishedEventPersisted (12 );
622
- runInMainThread (() -> jobEventStore .stop (false ));
635
+ runInMainThread (
636
+ () -> {
637
+ scheduler .cancel ();
638
+ jobEventStore .stop (false );
639
+ });
623
640
624
641
// start a new scheduler and try to recover.
625
642
AdaptiveBatchScheduler newScheduler = createScheduler (jobGraph );
@@ -682,6 +699,7 @@ void testPartitionNotFoundTwiceAfterJMFailover() throws Exception {
682
699
waitUntilWriteExecutionVertexFinishedEventPersisted (5 );
683
700
runInMainThread (
684
701
() -> {
702
+ scheduler .cancel ();
685
703
jobEventStore .stop (false );
686
704
});
687
705
0 commit comments