Skip to content

API documentation of the Dropbox provider

The module that contains all the necessary logic for communication with the Dropbox providers.

DropboxCore

Bases: StorageCore

Base class that creates the most important functions for the local storage provider.

Source code in src/sqooler/storage_providers/dropbox.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
class DropboxCore(StorageCore):
    """
    Base class that creates the most important functions for the local storage provider.
    """

    def __init__(
        self, login_dict: DropboxLoginInformation, name: str, is_active: bool = True
    ) -> None:
        """
        Args:
            login_dict: The dictionary that contains the login information
            name: The name of the storage provider
            is_active: Is the storage provider active.
        """

        super().__init__(name, is_active)
        self.app_key = login_dict.app_key
        self.app_secret = login_dict.app_secret
        self.refresh_token = login_dict.refresh_token

    def upload_string(
        self, content_string: str, storage_path: str, job_id: str
    ) -> None:
        """
        Upload the content_string as a json file to the dropbox

        Args:
            content_string: the content of the file that should be uploaded
            storage_path: the path where the file should be stored, but excluding the file name
            job_id: the name of the file without the .json extension
        """

        # strip trailing and leading slashes from the storage_path
        storage_path = storage_path.strip("/")

        # create the full path
        full_path = "/" + storage_path + "/" + job_id + ".json"

        # Create an instance of a Dropbox class, which can make requests to the API.
        with dropbox.Dropbox(
            app_key=self.app_key,
            app_secret=self.app_secret,
            oauth2_refresh_token=self.refresh_token,
        ) as dbx:
            # Check that the access token is valid
            dbx.users_get_current_account()
            dbx.files_upload(
                content_string.encode("utf-8"), full_path, mode=WriteMode("overwrite")
            )

    @validate_active
    def upload(self, content_dict: Mapping, storage_path: str, job_id: str) -> None:
        """
        Upload the content_dict as a json file to the dropbox

        Args:
            content_dict: the content of the file that should be uploaded
            storage_path: the path where the file should be stored, but excluding the file name
            job_id: the name of the file without the .json extension
        """

        # let us first see if the file already exists by using the get function
        # it would be much nicer to use an exists function, but we do not have that
        try:
            self.get(storage_path, job_id)
            raise FileExistsError(
                f"The file {job_id} in {storage_path} already exists and should not be overwritten."
            )
        except FileNotFoundError:
            # create the appropriate string for the dropbox API
            dump_str = json.dumps(content_dict, default=datetime_handler)
            self.upload_string(dump_str, storage_path, job_id)

    @validate_active
    def get(self, storage_path: str, job_id: str) -> dict:
        """
        Get the file content from the dropbox

        storage_path: the path where the file should be stored, but excluding the file name
        job_id: the name of the file. Is a json file
        """

        # strip trailing and leading slashes from the storage_path
        storage_path = storage_path.strip("/")

        # Create an instance of a Dropbox class, which can make requests to the API.
        with dropbox.Dropbox(
            app_key=self.app_key,
            app_secret=self.app_secret,
            oauth2_refresh_token=self.refresh_token,
        ) as dbx:
            # Check that the access token is valid
            try:
                dbx.users_get_current_account()
            except AuthError:
                sys.exit("ERROR: Invalid access token.")
            full_path = "/" + storage_path + "/" + job_id + ".json"
            try:
                _, res = dbx.files_download(path=full_path)
            except ApiError as err:
                raise FileNotFoundError(
                    f"Could not find file under {full_path}"
                ) from err
            data = res.content
        return json.loads(data.decode("utf-8"))

    @validate_active
    def update(self, content_dict: dict, storage_path: str, job_id: str) -> None:
        """
        Update the file content.

        Args:
            content_dict: The dictionary containing the new content of the file
            storage_path: The path to the file
            job_id: The id of the job

        Returns:
            None
        """
        # create the appropriate string for the dropbox API
        dump_str = json.dumps(content_dict, default=datetime_handler)

        # strip trailing and leading slashes from the storage_path
        storage_path = storage_path.strip("/")

        # create the full path
        full_path = "/" + storage_path + "/" + job_id + ".json"

        # Create an instance of a Dropbox class, which can make requests to the API.
        with dropbox.Dropbox(
            app_key=self.app_key,
            app_secret=self.app_secret,
            oauth2_refresh_token=self.refresh_token,
        ) as dbx:
            # Check that the access token is valid
            dbx.users_get_current_account()

            try:
                dbx.files_get_metadata(full_path)
            except ApiError as err:
                raise FileNotFoundError(
                    f"Could not update file under {full_path}"
                ) from err

            dbx.files_upload(
                dump_str.encode("utf-8"), full_path, mode=WriteMode("overwrite")
            )

    @validate_active
    def move(self, start_path: str, final_path: str, job_id: str) -> None:
        """
        Move the file from start_path to final_path

        start_path: the path where the file is currently stored, but excluding the file name
        final_path: the path where the file should be stored, but excluding the file name
        job_id: the name of the file. Is a json file

        Returns:
            None
        """
        # strip trailing and leading slashes from the paths
        start_path = start_path.strip("/")
        final_path = final_path.strip("/")

        # Create an instance of a Dropbox class, which can make requests to the API.
        with dropbox.Dropbox(
            app_key=self.app_key,
            app_secret=self.app_secret,
            oauth2_refresh_token=self.refresh_token,
        ) as dbx:
            dbx.users_get_current_account()

            full_start_path = "/" + start_path + "/" + job_id + ".json"
            full_final_path = "/" + final_path + "/" + job_id + ".json"
            dbx.files_move_v2(full_start_path, full_final_path)

    @validate_active
    def delete(self, storage_path: str, job_id: str) -> None:
        """
        Remove the file from the dropbox

        Args:
            storage_path: the path where the file should be stored, but excluding the file name
            job_id: the name of the file. Is a json file

        Returns:
            None
        """

        # strip trailing and leading slashes from the storage_path
        storage_path = storage_path.strip("/")

        # Create an instance of a Dropbox class, which can make requests to the API.
        with dropbox.Dropbox(
            app_key=self.app_key,
            app_secret=self.app_secret,
            oauth2_refresh_token=self.refresh_token,
        ) as dbx:
            # Check that the access token is valid
            try:
                dbx.users_get_current_account()
            except AuthError:
                sys.exit("ERROR: Invalid access token.")

            full_path = "/" + storage_path + "/" + job_id + ".json"
            try:
                _ = dbx.files_delete_v2(path=full_path)
            except ApiError as err:
                raise FileNotFoundError(
                    f"Could not delete file under {full_path}"
                ) from err

    def delete_folder(self, folder_path: str) -> None:
        """
        Remove the folder from the dropbox. Attention this will remove all the files in the folder.
        It is not a standard function for storage providers, but allows us to better clean up the
        tests.

        Args:
            folder_path: the path where the file should be stored, but excluding the file name

        Returns:
            None
        """

        # strip trailing and leading slashes from the storage_path
        folder_path = folder_path.strip("/")

        # Create an instance of a Dropbox class, which can make requests to the API.
        with dropbox.Dropbox(
            app_key=self.app_key,
            app_secret=self.app_secret,
            oauth2_refresh_token=self.refresh_token,
        ) as dbx:
            # Check that the access token is valid
            try:
                dbx.users_get_current_account()
            except AuthError:
                sys.exit("ERROR: Invalid access token.")

            # to remove a folder there must be no trailing slash
            full_path = "/" + folder_path
            _ = dbx.files_delete_v2(path=full_path)

__init__(login_dict, name, is_active=True)

Parameters:

Name Type Description Default
login_dict DropboxLoginInformation

The dictionary that contains the login information

required
name str

The name of the storage provider

required
is_active bool

Is the storage provider active.

True
Source code in src/sqooler/storage_providers/dropbox.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def __init__(
    self, login_dict: DropboxLoginInformation, name: str, is_active: bool = True
) -> None:
    """
    Args:
        login_dict: The dictionary that contains the login information
        name: The name of the storage provider
        is_active: Is the storage provider active.
    """

    super().__init__(name, is_active)
    self.app_key = login_dict.app_key
    self.app_secret = login_dict.app_secret
    self.refresh_token = login_dict.refresh_token

delete(storage_path, job_id)

Remove the file from the dropbox

Parameters:

Name Type Description Default
storage_path str

the path where the file should be stored, but excluding the file name

required
job_id str

the name of the file. Is a json file

required

Returns:

Type Description
None

None

Source code in src/sqooler/storage_providers/dropbox.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
@validate_active
def delete(self, storage_path: str, job_id: str) -> None:
    """
    Remove the file from the dropbox

    Args:
        storage_path: the path where the file should be stored, but excluding the file name
        job_id: the name of the file. Is a json file

    Returns:
        None
    """

    # strip trailing and leading slashes from the storage_path
    storage_path = storage_path.strip("/")

    # Create an instance of a Dropbox class, which can make requests to the API.
    with dropbox.Dropbox(
        app_key=self.app_key,
        app_secret=self.app_secret,
        oauth2_refresh_token=self.refresh_token,
    ) as dbx:
        # Check that the access token is valid
        try:
            dbx.users_get_current_account()
        except AuthError:
            sys.exit("ERROR: Invalid access token.")

        full_path = "/" + storage_path + "/" + job_id + ".json"
        try:
            _ = dbx.files_delete_v2(path=full_path)
        except ApiError as err:
            raise FileNotFoundError(
                f"Could not delete file under {full_path}"
            ) from err

delete_folder(folder_path)

Remove the folder from the dropbox. Attention this will remove all the files in the folder. It is not a standard function for storage providers, but allows us to better clean up the tests.

Parameters:

Name Type Description Default
folder_path str

the path where the file should be stored, but excluding the file name

required

Returns:

Type Description
None

None

Source code in src/sqooler/storage_providers/dropbox.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def delete_folder(self, folder_path: str) -> None:
    """
    Remove the folder from the dropbox. Attention this will remove all the files in the folder.
    It is not a standard function for storage providers, but allows us to better clean up the
    tests.

    Args:
        folder_path: the path where the file should be stored, but excluding the file name

    Returns:
        None
    """

    # strip trailing and leading slashes from the storage_path
    folder_path = folder_path.strip("/")

    # Create an instance of a Dropbox class, which can make requests to the API.
    with dropbox.Dropbox(
        app_key=self.app_key,
        app_secret=self.app_secret,
        oauth2_refresh_token=self.refresh_token,
    ) as dbx:
        # Check that the access token is valid
        try:
            dbx.users_get_current_account()
        except AuthError:
            sys.exit("ERROR: Invalid access token.")

        # to remove a folder there must be no trailing slash
        full_path = "/" + folder_path
        _ = dbx.files_delete_v2(path=full_path)

get(storage_path, job_id)

Get the file content from the dropbox

storage_path: the path where the file should be stored, but excluding the file name job_id: the name of the file. Is a json file

Source code in src/sqooler/storage_providers/dropbox.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
@validate_active
def get(self, storage_path: str, job_id: str) -> dict:
    """
    Get the file content from the dropbox

    storage_path: the path where the file should be stored, but excluding the file name
    job_id: the name of the file. Is a json file
    """

    # strip trailing and leading slashes from the storage_path
    storage_path = storage_path.strip("/")

    # Create an instance of a Dropbox class, which can make requests to the API.
    with dropbox.Dropbox(
        app_key=self.app_key,
        app_secret=self.app_secret,
        oauth2_refresh_token=self.refresh_token,
    ) as dbx:
        # Check that the access token is valid
        try:
            dbx.users_get_current_account()
        except AuthError:
            sys.exit("ERROR: Invalid access token.")
        full_path = "/" + storage_path + "/" + job_id + ".json"
        try:
            _, res = dbx.files_download(path=full_path)
        except ApiError as err:
            raise FileNotFoundError(
                f"Could not find file under {full_path}"
            ) from err
        data = res.content
    return json.loads(data.decode("utf-8"))

move(start_path, final_path, job_id)

Move the file from start_path to final_path

start_path: the path where the file is currently stored, but excluding the file name final_path: the path where the file should be stored, but excluding the file name job_id: the name of the file. Is a json file

Returns:

Type Description
None

None

Source code in src/sqooler/storage_providers/dropbox.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
@validate_active
def move(self, start_path: str, final_path: str, job_id: str) -> None:
    """
    Move the file from start_path to final_path

    start_path: the path where the file is currently stored, but excluding the file name
    final_path: the path where the file should be stored, but excluding the file name
    job_id: the name of the file. Is a json file

    Returns:
        None
    """
    # strip trailing and leading slashes from the paths
    start_path = start_path.strip("/")
    final_path = final_path.strip("/")

    # Create an instance of a Dropbox class, which can make requests to the API.
    with dropbox.Dropbox(
        app_key=self.app_key,
        app_secret=self.app_secret,
        oauth2_refresh_token=self.refresh_token,
    ) as dbx:
        dbx.users_get_current_account()

        full_start_path = "/" + start_path + "/" + job_id + ".json"
        full_final_path = "/" + final_path + "/" + job_id + ".json"
        dbx.files_move_v2(full_start_path, full_final_path)

update(content_dict, storage_path, job_id)

Update the file content.

Parameters:

Name Type Description Default
content_dict dict

The dictionary containing the new content of the file

required
storage_path str

The path to the file

required
job_id str

The id of the job

required

Returns:

Type Description
None

None

Source code in src/sqooler/storage_providers/dropbox.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
@validate_active
def update(self, content_dict: dict, storage_path: str, job_id: str) -> None:
    """
    Update the file content.

    Args:
        content_dict: The dictionary containing the new content of the file
        storage_path: The path to the file
        job_id: The id of the job

    Returns:
        None
    """
    # create the appropriate string for the dropbox API
    dump_str = json.dumps(content_dict, default=datetime_handler)

    # strip trailing and leading slashes from the storage_path
    storage_path = storage_path.strip("/")

    # create the full path
    full_path = "/" + storage_path + "/" + job_id + ".json"

    # Create an instance of a Dropbox class, which can make requests to the API.
    with dropbox.Dropbox(
        app_key=self.app_key,
        app_secret=self.app_secret,
        oauth2_refresh_token=self.refresh_token,
    ) as dbx:
        # Check that the access token is valid
        dbx.users_get_current_account()

        try:
            dbx.files_get_metadata(full_path)
        except ApiError as err:
            raise FileNotFoundError(
                f"Could not update file under {full_path}"
            ) from err

        dbx.files_upload(
            dump_str.encode("utf-8"), full_path, mode=WriteMode("overwrite")
        )

upload(content_dict, storage_path, job_id)

Upload the content_dict as a json file to the dropbox

Parameters:

Name Type Description Default
content_dict Mapping

the content of the file that should be uploaded

required
storage_path str

the path where the file should be stored, but excluding the file name

required
job_id str

the name of the file without the .json extension

required
Source code in src/sqooler/storage_providers/dropbox.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
@validate_active
def upload(self, content_dict: Mapping, storage_path: str, job_id: str) -> None:
    """
    Upload the content_dict as a json file to the dropbox

    Args:
        content_dict: the content of the file that should be uploaded
        storage_path: the path where the file should be stored, but excluding the file name
        job_id: the name of the file without the .json extension
    """

    # let us first see if the file already exists by using the get function
    # it would be much nicer to use an exists function, but we do not have that
    try:
        self.get(storage_path, job_id)
        raise FileExistsError(
            f"The file {job_id} in {storage_path} already exists and should not be overwritten."
        )
    except FileNotFoundError:
        # create the appropriate string for the dropbox API
        dump_str = json.dumps(content_dict, default=datetime_handler)
        self.upload_string(dump_str, storage_path, job_id)

upload_string(content_string, storage_path, job_id)

Upload the content_string as a json file to the dropbox

Parameters:

Name Type Description Default
content_string str

the content of the file that should be uploaded

required
storage_path str

the path where the file should be stored, but excluding the file name

required
job_id str

the name of the file without the .json extension

required
Source code in src/sqooler/storage_providers/dropbox.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def upload_string(
    self, content_string: str, storage_path: str, job_id: str
) -> None:
    """
    Upload the content_string as a json file to the dropbox

    Args:
        content_string: the content of the file that should be uploaded
        storage_path: the path where the file should be stored, but excluding the file name
        job_id: the name of the file without the .json extension
    """

    # strip trailing and leading slashes from the storage_path
    storage_path = storage_path.strip("/")

    # create the full path
    full_path = "/" + storage_path + "/" + job_id + ".json"

    # Create an instance of a Dropbox class, which can make requests to the API.
    with dropbox.Dropbox(
        app_key=self.app_key,
        app_secret=self.app_secret,
        oauth2_refresh_token=self.refresh_token,
    ) as dbx:
        # Check that the access token is valid
        dbx.users_get_current_account()
        dbx.files_upload(
            content_string.encode("utf-8"), full_path, mode=WriteMode("overwrite")
        )

DropboxProvider

Bases: DropboxProviderExtended

The class that implements the dropbox storage provider.

Source code in src/sqooler/storage_providers/dropbox.py
745
746
747
748
749
750
751
752
753
754
755
756
class DropboxProvider(DropboxProviderExtended):
    """
    The class that implements the dropbox storage provider.
    """

    def __init__(self, login_dict: DropboxLoginInformation) -> None:
        """
        Args:
            login_dict: The dictionary that contains the login information
        """

        super().__init__(login_dict, name="default", is_active=True)

__init__(login_dict)

Parameters:

Name Type Description Default
login_dict DropboxLoginInformation

The dictionary that contains the login information

required
Source code in src/sqooler/storage_providers/dropbox.py
750
751
752
753
754
755
756
def __init__(self, login_dict: DropboxLoginInformation) -> None:
    """
    Args:
        login_dict: The dictionary that contains the login information
    """

    super().__init__(login_dict, name="default", is_active=True)

DropboxProviderExtended

Bases: StorageProvider, DropboxCore

The class that implements the dropbox storage provider.

Attributes:

Name Type Description
configs_path PathStr

The path to the folder where the configurations are stored

queue_path PathStr

The path to the folder where the jobs are stored

running_path PathStr

The path to the folder where the running jobs are stored

finished_path PathStr

The path to the folder where the finished jobs are stored

deleted_path PathStr

The path to the folder where the deleted jobs are stored

status_path PathStr

The path to the folder where the status is stored

results_path PathStr

The path to the folder where the results are stored

pks_path PathStr

The path to the folder where the public keys are stored

Source code in src/sqooler/storage_providers/dropbox.py
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
class DropboxProviderExtended(StorageProvider, DropboxCore):
    """
    The class that implements the dropbox storage provider.

    Attributes:
        configs_path: The path to the folder where the configurations are stored
        queue_path: The path to the folder where the jobs are stored
        running_path: The path to the folder where the running jobs are stored
        finished_path: The path to the folder where the finished jobs are stored
        deleted_path: The path to the folder where the deleted jobs are stored
        status_path: The path to the folder where the status is stored
        results_path: The path to the folder where the results are stored
        pks_path: The path to the folder where the public keys are stored

    """

    configs_path: PathStr = "Backend_files/Config"
    queue_path: PathStr = "Backend_files/Queued_Jobs"
    running_path: PathStr = "Backend_files/Running_Jobs"
    finished_path: PathStr = "Backend_files/Finished_Jobs"
    deleted_path: PathStr = "Backend_files/Deleted_Jobs"
    status_path: PathStr = "Backend_files/Status"
    results_path: PathStr = "Backend_files/Result"
    pks_path: PathStr = "Backend_files/public_keys"

    def get_attribute_path(
        self,
        attribute_name: AttributePathStr,
        display_name: Optional[DisplayNameStr] = None,
        job_id: Optional[str] = None,
        username: Optional[str] = None,
    ) -> str:
        """
        Get the path to the results of the device.

        Args:
            display_name: The name of the backend
            attribute_name: The name of the attribute
            job_id: The job_id of the job
            username: The username of the user that is uploading the job

        Returns:
            The path to the results of the device.
        """

        match attribute_name:
            case "configs":
                if display_name is None:
                    raise ValueError("The display_name must be set for configs_path.")
                path = f"{self.configs_path}/{display_name}"
            case "results":
                if job_id is None:
                    raise ValueError("The job_id must be set for results path.")
                extracted_username = job_id.split("-")[2]
                path = f"/{self.results_path}/{display_name}/{extracted_username}/"
            case "running":
                path = self.running_path
            case "status":
                path = f"/{self.status_path}/{display_name}/{username}/"
            case "queue":
                path = f"/{self.queue_path}/{display_name}/"
            case "deleted":
                path = self.deleted_path
            case "finished":
                path = f"/{self.finished_path}/{display_name}/{username}/"
            case "pks":
                path = self.pks_path
            case _:
                raise ValueError(f"The attribute name {attribute_name} is not valid.")
        return path

    def get_attribute_id(
        self,
        attribute_name: AttributeIdStr,
        job_id: str,
        display_name: Optional[DisplayNameStr] = None,
    ) -> str:
        """
        Get the path to the id of the device.

        Args:
            attribute_name: The name of the attribute
            job_id: The job_id of the job
            display_name: The name of the backend

        Returns:
            The path to the results of the device.
        """

        match attribute_name:
            case "configs":
                _id = "config"
            case "job":
                _id = f"job-{job_id}"
            case "results":
                _id = "result-" + job_id
            case "status":
                _id = "status-" + job_id
            case _:
                raise ValueError(f"The attribute name {attribute_name} is not valid.")
        return _id

    def update_config(
        self,
        config_dict: BackendConfigSchemaIn,
        display_name: DisplayNameStr,
        private_jwk: Optional[JWK] = None,
    ) -> None:
        """
        The function that updates the spooler configuration to the storage.

        All the configurations are stored in the Backend_files/Config folder.
        For each backend there is a separate folder in which the configuration is stored as a json file.

        Args:
            config_dict: The dictionary containing the configuration
            display_name : The name of the backend
            private_jwk: The private JWK to sign the configuration with

        Returns:
            None
        """

        config_dict = self._verify_config(config_dict, display_name)
        # check that the file exists
        config_path = self.get_attribute_path("configs", display_name=display_name)
        old_config_jws = self.get(config_path, "config")

        upload_dict = self._format_update_config(
            old_config_jws, config_dict, private_jwk
        )

        self.update(upload_dict, config_path, "config")

    def _delete_config(self, display_name: DisplayNameStr) -> bool:
        """
        Delete a config from the storage. This is only intended for test purposes.

        Args:
            display_name: The name of the backend to which we want to upload the job

        Raises:
            FileNotFoundError: If the status does not exist.

        Returns:
            Success if the file was deleted successfully
        """
        config_path = f"{self.configs_path}/{display_name}"

        self.delete(storage_path=config_path, job_id="config")
        self.delete_folder(config_path)
        return True

    def upload_public_key(
        self, public_jwk: JWK, display_name: DisplayNameStr, role: PksStr = "backend"
    ) -> None:
        """
        The function that uploads the spooler public JWK to the storage.

        Args:
            public_jwk: The JWK that contains the public key
            display_name : The name of the backend
            role: The role of the public key

        Returns:
            None
        """
        # first make sure that the public key is intended for verification
        if not public_jwk.key_ops == "verify":
            raise ValueError("The key is not intended for verification")

        # make sure that the key does not contain a private key
        if public_jwk.d is not None:
            raise ValueError("The key contains a private key")

        # make sure that the key has the correct kid
        if role == "backend":
            config_dict = self.get_config(display_name)
            if public_jwk.kid != config_dict.kid:
                raise ValueError("The key does not have the correct kid.")

        pks_path = self.get_attribute_path("pks")
        self.upload_string(public_jwk.model_dump_json(), pks_path, public_jwk.kid)

    def get_public_key_from_kid(self, kid: str) -> JWK:
        """
        The function that gets public JWK based on the key id.

        Args:
            kid : The key id of the backend

        Returns:
            JWk : The public JWK object
        """

        pks_path = self.get_attribute_path("pks")
        public_jwk_dict = self.get(storage_path=pks_path, job_id=kid)
        return JWK(**public_jwk_dict)

    def _delete_public_key(self, kid: str) -> bool:
        """
        Delete a public key from the storage. This is only intended for test purposes.

        Args:
            kid: The key id of the public key

        Raises:
            FileNotFoundError: If the status does not exist.

        Returns:
            Success if the file was deleted successfully
        """
        pks_path = self.get_attribute_path("pks")
        self.delete(storage_path=pks_path, job_id=kid)
        return True

    def update_in_database(
        self,
        result_dict: ResultDict,
        status_msg_dict: StatusMsgDict,
        job_id: str,
        display_name: DisplayNameStr,
        private_jwk: Optional[JWK] = None,
    ) -> None:
        """
        Upload the status and result to the dropbox.

        Args:
            result_dict: the dictionary containing the result of the job
            status_msg_dict: the dictionary containing the status message of the job
            job_id: the name of the job
            display_name: the name of the backend
            private_jwk: the private JWK to sign the result with

        Returns:
            None
        """
        # this should become part of the json file instead of its name in the future
        extracted_username = job_id.split("-")[2]

        status_json_dir = self.get_attribute_path(
            "status", display_name, extracted_username
        )

        status_json_name = self.get_attribute_id("status", job_id=job_id)

        job_json_name = self.get_attribute_id("job", job_id)
        job_json_start_dir = self.get_attribute_path("running")

        if status_msg_dict.status == "DONE":
            self.upload_result(
                result_dict,
                display_name,
                job_id,
                private_jwk,
            )
            # now move the job out of the running jobs into the finished jobs
            job_finished_json_dir = self.get_attribute_path(
                "finished", display_name=display_name, username=extracted_username
            )

            self.move(job_json_start_dir, job_finished_json_dir, job_json_name)

        elif status_msg_dict.status == "ERROR":
            # because there was an error, we move the job to the deleted jobs
            deleted_json_dir = self.get_attribute_path("deleted", display_name)
            self.move(job_json_start_dir, deleted_json_dir, job_json_name)

        try:
            self.update(status_msg_dict.model_dump(), status_json_dir, status_json_name)
        except FileNotFoundError:
            logging.warning(
                "The status file was missing for %s with job_id %s was missing.",
                display_name,
                job_id,
            )
            self.upload_status(display_name, username=extracted_username, job_id=job_id)
            self.update(status_msg_dict.model_dump(), status_json_dir, status_json_name)

    def get_file_queue(self, storage_path: str) -> list[str]:
        """
        Get a list of files. Typically we are looking for the queued jobs of a backend here.

        Args:
            storage_path: Where are we looking for the files.

        Returns:
            A list of files that was found.
        """

        # strip trailing and leading slashes from the paths
        storage_path = storage_path.strip("/")

        storage_path = "/" + storage_path.strip("/") + "/"

        # Create an instance of a Dropbox class, which can make requests to the API.
        names: list[str] = []
        with dropbox.Dropbox(
            app_key=self.app_key,
            app_secret=self.app_secret,
            oauth2_refresh_token=self.refresh_token,
        ) as dbx:
            # Check that the access token is valid
            try:
                dbx.users_get_current_account()
            except AuthError:
                sys.exit("ERROR: Invalid access token.")
            # We should really handle these exceptions cleaner, but this seems a bit
            # complicated right now
            # pylint: disable=W0703
            try:

                # we have too loop as dropbox somehow sometimes only returns a part of the files
                file_list = []  # collects all files here
                has_more_files = True  # because we haven't queried yet
                cursor = None  # because we haven't queried yet
                while has_more_files:
                    if cursor is None:  # if it is our first time querying
                        folders_results = dbx.files_list_folder(storage_path)
                    else:
                        # we can ignore the mypy error that this is unreachable as the cursor is
                        # set in the while loop
                        folders_results = dbx.files_list_folder_continue(cursor)  # type: ignore
                    file_list.extend(folders_results.entries)
                    cursor = folders_results.cursor
                    has_more_files = folders_results.has_more

                file_list = [item.name for item in file_list]
                json_files = [item for item in file_list if item.endswith(".json")]

                # Get the backend names
                names = [file_name.split(".")[0] for file_name in json_files]

            except ApiError:
                print(f"Could not obtain job queue for {storage_path}")
            except Exception as err:
                print(err)
        return names

    @validate_active
    def get_backends(self) -> list[str]:
        """
        Get a list of all the backends that the provider offers.
        """

        # strip possible trailing and leading slashes from the path
        config_path = self.configs_path.strip("/")

        # and now add them nicely
        full_config_path = f"/{config_path}/"
        with dropbox.Dropbox(
            app_key=self.app_key,
            app_secret=self.app_secret,
            oauth2_refresh_token=self.refresh_token,
        ) as dbx:
            # Check that the access token is valid
            try:
                dbx.users_get_current_account()
            except AuthError:
                sys.exit("ERROR: Invalid access token.")

            # we have too loop as dropbox somehow sometimes only returns a part of the files
            file_list = []  # collects all files here
            has_more_files = True  # because we haven't queried yet
            cursor = None  # because we haven't queried yet
            while has_more_files:
                if cursor is None:  # if it is our first time querying
                    folders_results = dbx.files_list_folder(path=full_config_path)
                else:
                    # we can ignore the mypy error that this is unreachable as the cursor is
                    # set in the while loop
                    folders_results = dbx.files_list_folder_continue(cursor)  # type: ignore
                file_list.extend(folders_results.entries)
                cursor = folders_results.cursor
                has_more_files = folders_results.has_more

            backend_names = []
            for entry in file_list:
                backend_names.append(entry.name)
        return backend_names

    def get_config(self, display_name: DisplayNameStr) -> BackendConfigSchemaIn:
        """
        The function that downloads the spooler configuration to the storage.

        Args:
            display_name : The name of the backend

        Raises:
            FileNotFoundError: If the backend does not exist

        Returns:
            The configuration of the backend in complete form.
        """
        backend_json_path = f"{self.configs_path}/{display_name}"
        backend_config_dict = self.get(storage_path=backend_json_path, job_id="config")
        typed_config = self._adapt_get_config(backend_config_dict)
        return typed_config

    def create_job_id(self, display_name: DisplayNameStr, username: str) -> str:
        """
        Create a job id for the job.

        Args:
            display_name: The name of the backend
            username: The username of the user that is uploading the job

        Returns:
            The job id
        """
        job_id = (
            (datetime.datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S"))
            + "-"
            + display_name
            + "-"
            + username
            + "-"
            + (uuid.uuid4().hex)[:5]
        )
        return job_id

    def _delete_status(
        self, display_name: DisplayNameStr, username: str, job_id: str
    ) -> bool:
        """
        Delete a status from the storage. This is only intended for test purposes.

        Args:
            display_name: The name of the backend to which we want to upload the job
            username: The username of the user that is uploading the job
            job_id: The job_id of the job that we want to upload the status for

        Raises:
            FileNotFoundError: If the status does not exist.

        Returns:
            Success if the file was deleted successfully
        """

        status_json_dir = self.get_attribute_path("status", display_name, username)

        status_json_name = self.get_attribute_id("status", job_id)

        self.delete(storage_path=status_json_dir, job_id=status_json_name)
        self.delete_folder(status_json_dir)
        return True

    def _delete_result(self, display_name: DisplayNameStr, job_id: str) -> bool:
        """
        Delete a result from the storage. This is only intended for test purposes.

        Args:
            display_name: The name of the backend to which we want to upload the job
            username: The username of the user that is uploading the job
            job_id: The job_id of the job that we want to upload the status for

        Raises:
            FileNotFoundError: If the result does not exist.

        Returns:
            Success if the file was deleted successfully
        """
        result_device_dir = self.get_attribute_path("results", display_name, job_id)
        self.delete_folder(result_device_dir)
        return True

create_job_id(display_name, username)

Create a job id for the job.

Parameters:

Name Type Description Default
display_name DisplayNameStr

The name of the backend

required
username str

The username of the user that is uploading the job

required

Returns:

Type Description
str

The job id

Source code in src/sqooler/storage_providers/dropbox.py
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
def create_job_id(self, display_name: DisplayNameStr, username: str) -> str:
    """
    Create a job id for the job.

    Args:
        display_name: The name of the backend
        username: The username of the user that is uploading the job

    Returns:
        The job id
    """
    job_id = (
        (datetime.datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S"))
        + "-"
        + display_name
        + "-"
        + username
        + "-"
        + (uuid.uuid4().hex)[:5]
    )
    return job_id

get_attribute_id(attribute_name, job_id, display_name=None)

Get the path to the id of the device.

Parameters:

Name Type Description Default
attribute_name AttributeIdStr

The name of the attribute

required
job_id str

The job_id of the job

required
display_name Optional[DisplayNameStr]

The name of the backend

None

Returns:

Type Description
str

The path to the results of the device.

Source code in src/sqooler/storage_providers/dropbox.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
def get_attribute_id(
    self,
    attribute_name: AttributeIdStr,
    job_id: str,
    display_name: Optional[DisplayNameStr] = None,
) -> str:
    """
    Get the path to the id of the device.

    Args:
        attribute_name: The name of the attribute
        job_id: The job_id of the job
        display_name: The name of the backend

    Returns:
        The path to the results of the device.
    """

    match attribute_name:
        case "configs":
            _id = "config"
        case "job":
            _id = f"job-{job_id}"
        case "results":
            _id = "result-" + job_id
        case "status":
            _id = "status-" + job_id
        case _:
            raise ValueError(f"The attribute name {attribute_name} is not valid.")
    return _id

get_attribute_path(attribute_name, display_name=None, job_id=None, username=None)

Get the path to the results of the device.

Parameters:

Name Type Description Default
display_name Optional[DisplayNameStr]

The name of the backend

None
attribute_name AttributePathStr

The name of the attribute

required
job_id Optional[str]

The job_id of the job

None
username Optional[str]

The username of the user that is uploading the job

None

Returns:

Type Description
str

The path to the results of the device.

Source code in src/sqooler/storage_providers/dropbox.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
def get_attribute_path(
    self,
    attribute_name: AttributePathStr,
    display_name: Optional[DisplayNameStr] = None,
    job_id: Optional[str] = None,
    username: Optional[str] = None,
) -> str:
    """
    Get the path to the results of the device.

    Args:
        display_name: The name of the backend
        attribute_name: The name of the attribute
        job_id: The job_id of the job
        username: The username of the user that is uploading the job

    Returns:
        The path to the results of the device.
    """

    match attribute_name:
        case "configs":
            if display_name is None:
                raise ValueError("The display_name must be set for configs_path.")
            path = f"{self.configs_path}/{display_name}"
        case "results":
            if job_id is None:
                raise ValueError("The job_id must be set for results path.")
            extracted_username = job_id.split("-")[2]
            path = f"/{self.results_path}/{display_name}/{extracted_username}/"
        case "running":
            path = self.running_path
        case "status":
            path = f"/{self.status_path}/{display_name}/{username}/"
        case "queue":
            path = f"/{self.queue_path}/{display_name}/"
        case "deleted":
            path = self.deleted_path
        case "finished":
            path = f"/{self.finished_path}/{display_name}/{username}/"
        case "pks":
            path = self.pks_path
        case _:
            raise ValueError(f"The attribute name {attribute_name} is not valid.")
    return path

get_backends()

Get a list of all the backends that the provider offers.

Source code in src/sqooler/storage_providers/dropbox.py
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
@validate_active
def get_backends(self) -> list[str]:
    """
    Get a list of all the backends that the provider offers.
    """

    # strip possible trailing and leading slashes from the path
    config_path = self.configs_path.strip("/")

    # and now add them nicely
    full_config_path = f"/{config_path}/"
    with dropbox.Dropbox(
        app_key=self.app_key,
        app_secret=self.app_secret,
        oauth2_refresh_token=self.refresh_token,
    ) as dbx:
        # Check that the access token is valid
        try:
            dbx.users_get_current_account()
        except AuthError:
            sys.exit("ERROR: Invalid access token.")

        # we have too loop as dropbox somehow sometimes only returns a part of the files
        file_list = []  # collects all files here
        has_more_files = True  # because we haven't queried yet
        cursor = None  # because we haven't queried yet
        while has_more_files:
            if cursor is None:  # if it is our first time querying
                folders_results = dbx.files_list_folder(path=full_config_path)
            else:
                # we can ignore the mypy error that this is unreachable as the cursor is
                # set in the while loop
                folders_results = dbx.files_list_folder_continue(cursor)  # type: ignore
            file_list.extend(folders_results.entries)
            cursor = folders_results.cursor
            has_more_files = folders_results.has_more

        backend_names = []
        for entry in file_list:
            backend_names.append(entry.name)
    return backend_names

get_config(display_name)

The function that downloads the spooler configuration to the storage.

Parameters:

Name Type Description Default
display_name

The name of the backend

required

Raises:

Type Description
FileNotFoundError

If the backend does not exist

Returns:

Type Description
BackendConfigSchemaIn

The configuration of the backend in complete form.

Source code in src/sqooler/storage_providers/dropbox.py
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
def get_config(self, display_name: DisplayNameStr) -> BackendConfigSchemaIn:
    """
    The function that downloads the spooler configuration to the storage.

    Args:
        display_name : The name of the backend

    Raises:
        FileNotFoundError: If the backend does not exist

    Returns:
        The configuration of the backend in complete form.
    """
    backend_json_path = f"{self.configs_path}/{display_name}"
    backend_config_dict = self.get(storage_path=backend_json_path, job_id="config")
    typed_config = self._adapt_get_config(backend_config_dict)
    return typed_config

get_file_queue(storage_path)

Get a list of files. Typically we are looking for the queued jobs of a backend here.

Parameters:

Name Type Description Default
storage_path str

Where are we looking for the files.

required

Returns:

Type Description
list[str]

A list of files that was found.

Source code in src/sqooler/storage_providers/dropbox.py
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
def get_file_queue(self, storage_path: str) -> list[str]:
    """
    Get a list of files. Typically we are looking for the queued jobs of a backend here.

    Args:
        storage_path: Where are we looking for the files.

    Returns:
        A list of files that was found.
    """

    # strip trailing and leading slashes from the paths
    storage_path = storage_path.strip("/")

    storage_path = "/" + storage_path.strip("/") + "/"

    # Create an instance of a Dropbox class, which can make requests to the API.
    names: list[str] = []
    with dropbox.Dropbox(
        app_key=self.app_key,
        app_secret=self.app_secret,
        oauth2_refresh_token=self.refresh_token,
    ) as dbx:
        # Check that the access token is valid
        try:
            dbx.users_get_current_account()
        except AuthError:
            sys.exit("ERROR: Invalid access token.")
        # We should really handle these exceptions cleaner, but this seems a bit
        # complicated right now
        # pylint: disable=W0703
        try:

            # we have too loop as dropbox somehow sometimes only returns a part of the files
            file_list = []  # collects all files here
            has_more_files = True  # because we haven't queried yet
            cursor = None  # because we haven't queried yet
            while has_more_files:
                if cursor is None:  # if it is our first time querying
                    folders_results = dbx.files_list_folder(storage_path)
                else:
                    # we can ignore the mypy error that this is unreachable as the cursor is
                    # set in the while loop
                    folders_results = dbx.files_list_folder_continue(cursor)  # type: ignore
                file_list.extend(folders_results.entries)
                cursor = folders_results.cursor
                has_more_files = folders_results.has_more

            file_list = [item.name for item in file_list]
            json_files = [item for item in file_list if item.endswith(".json")]

            # Get the backend names
            names = [file_name.split(".")[0] for file_name in json_files]

        except ApiError:
            print(f"Could not obtain job queue for {storage_path}")
        except Exception as err:
            print(err)
    return names

get_public_key_from_kid(kid)

The function that gets public JWK based on the key id.

Parameters:

Name Type Description Default
kid

The key id of the backend

required

Returns:

Name Type Description
JWk JWK

The public JWK object

Source code in src/sqooler/storage_providers/dropbox.py
462
463
464
465
466
467
468
469
470
471
472
473
474
475
def get_public_key_from_kid(self, kid: str) -> JWK:
    """
    The function that gets public JWK based on the key id.

    Args:
        kid : The key id of the backend

    Returns:
        JWk : The public JWK object
    """

    pks_path = self.get_attribute_path("pks")
    public_jwk_dict = self.get(storage_path=pks_path, job_id=kid)
    return JWK(**public_jwk_dict)

update_config(config_dict, display_name, private_jwk=None)

The function that updates the spooler configuration to the storage.

All the configurations are stored in the Backend_files/Config folder. For each backend there is a separate folder in which the configuration is stored as a json file.

Parameters:

Name Type Description Default
config_dict BackendConfigSchemaIn

The dictionary containing the configuration

required
display_name

The name of the backend

required
private_jwk Optional[JWK]

The private JWK to sign the configuration with

None

Returns:

Type Description
None

None

Source code in src/sqooler/storage_providers/dropbox.py
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
def update_config(
    self,
    config_dict: BackendConfigSchemaIn,
    display_name: DisplayNameStr,
    private_jwk: Optional[JWK] = None,
) -> None:
    """
    The function that updates the spooler configuration to the storage.

    All the configurations are stored in the Backend_files/Config folder.
    For each backend there is a separate folder in which the configuration is stored as a json file.

    Args:
        config_dict: The dictionary containing the configuration
        display_name : The name of the backend
        private_jwk: The private JWK to sign the configuration with

    Returns:
        None
    """

    config_dict = self._verify_config(config_dict, display_name)
    # check that the file exists
    config_path = self.get_attribute_path("configs", display_name=display_name)
    old_config_jws = self.get(config_path, "config")

    upload_dict = self._format_update_config(
        old_config_jws, config_dict, private_jwk
    )

    self.update(upload_dict, config_path, "config")

update_in_database(result_dict, status_msg_dict, job_id, display_name, private_jwk=None)

Upload the status and result to the dropbox.

Parameters:

Name Type Description Default
result_dict ResultDict

the dictionary containing the result of the job

required
status_msg_dict StatusMsgDict

the dictionary containing the status message of the job

required
job_id str

the name of the job

required
display_name DisplayNameStr

the name of the backend

required
private_jwk Optional[JWK]

the private JWK to sign the result with

None

Returns:

Type Description
None

None

Source code in src/sqooler/storage_providers/dropbox.py
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
def update_in_database(
    self,
    result_dict: ResultDict,
    status_msg_dict: StatusMsgDict,
    job_id: str,
    display_name: DisplayNameStr,
    private_jwk: Optional[JWK] = None,
) -> None:
    """
    Upload the status and result to the dropbox.

    Args:
        result_dict: the dictionary containing the result of the job
        status_msg_dict: the dictionary containing the status message of the job
        job_id: the name of the job
        display_name: the name of the backend
        private_jwk: the private JWK to sign the result with

    Returns:
        None
    """
    # this should become part of the json file instead of its name in the future
    extracted_username = job_id.split("-")[2]

    status_json_dir = self.get_attribute_path(
        "status", display_name, extracted_username
    )

    status_json_name = self.get_attribute_id("status", job_id=job_id)

    job_json_name = self.get_attribute_id("job", job_id)
    job_json_start_dir = self.get_attribute_path("running")

    if status_msg_dict.status == "DONE":
        self.upload_result(
            result_dict,
            display_name,
            job_id,
            private_jwk,
        )
        # now move the job out of the running jobs into the finished jobs
        job_finished_json_dir = self.get_attribute_path(
            "finished", display_name=display_name, username=extracted_username
        )

        self.move(job_json_start_dir, job_finished_json_dir, job_json_name)

    elif status_msg_dict.status == "ERROR":
        # because there was an error, we move the job to the deleted jobs
        deleted_json_dir = self.get_attribute_path("deleted", display_name)
        self.move(job_json_start_dir, deleted_json_dir, job_json_name)

    try:
        self.update(status_msg_dict.model_dump(), status_json_dir, status_json_name)
    except FileNotFoundError:
        logging.warning(
            "The status file was missing for %s with job_id %s was missing.",
            display_name,
            job_id,
        )
        self.upload_status(display_name, username=extracted_username, job_id=job_id)
        self.update(status_msg_dict.model_dump(), status_json_dir, status_json_name)

upload_public_key(public_jwk, display_name, role='backend')

The function that uploads the spooler public JWK to the storage.

Parameters:

Name Type Description Default
public_jwk JWK

The JWK that contains the public key

required
display_name

The name of the backend

required
role PksStr

The role of the public key

'backend'

Returns:

Type Description
None

None

Source code in src/sqooler/storage_providers/dropbox.py
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
def upload_public_key(
    self, public_jwk: JWK, display_name: DisplayNameStr, role: PksStr = "backend"
) -> None:
    """
    The function that uploads the spooler public JWK to the storage.

    Args:
        public_jwk: The JWK that contains the public key
        display_name : The name of the backend
        role: The role of the public key

    Returns:
        None
    """
    # first make sure that the public key is intended for verification
    if not public_jwk.key_ops == "verify":
        raise ValueError("The key is not intended for verification")

    # make sure that the key does not contain a private key
    if public_jwk.d is not None:
        raise ValueError("The key contains a private key")

    # make sure that the key has the correct kid
    if role == "backend":
        config_dict = self.get_config(display_name)
        if public_jwk.kid != config_dict.kid:
            raise ValueError("The key does not have the correct kid.")

    pks_path = self.get_attribute_path("pks")
    self.upload_string(public_jwk.model_dump_json(), pks_path, public_jwk.kid)

Comments