Skip to content

API documentation of the local storage provider

The module that contains all the necessary logic for communication with the local storage providers.

LocalCore

Bases: StorageCore

Base class that creates the most important functions for the local storage provider.

Source code in src/sqooler/storage_providers/local.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
class LocalCore(StorageCore):
    """
    Base class that creates the most important functions for the local storage provider.
    """

    def __init__(
        self, login_dict: LocalLoginInformation, name: str, is_active: bool = True
    ) -> None:
        """
        Set up the neccessary keys and create the client through which all the connections will run.

        Args:
            login_dict: The login dict that contains the neccessary
                        information to connect to the local storage
            name: The name of the storage provider
            is_active: Is the storage provider active.

        Raises:
            ValidationError: If the login_dict is not valid
        """
        super().__init__(name, is_active)
        self.base_path = login_dict.base_path

    @validate_active
    def upload(self, content_dict: Mapping, storage_path: str, job_id: str) -> None:
        """
        Upload the file to the storage

        Args:
            content_dict: The dictionary containing the content of the file
            storage_path: The path to the file
            job_id: The id of the job
        """
        # strip trailing and leading slashes from the storage_path
        storage_path = storage_path.strip("/")

        # json folder
        folder_path = self.base_path + "/" + storage_path
        if not os.path.exists(folder_path):
            os.makedirs(folder_path)

        # create the full path
        file_name = job_id + ".json"
        full_json_path = os.path.join(folder_path, file_name)
        secure_path = os.path.normpath(full_json_path)
        # test if the file already exists and raise a warning if it does
        if os.path.exists(secure_path):
            raise FileExistsError(
                f"The file {secure_path} already exists and should not be overwritten."
            )
        with open(secure_path, "w", encoding="utf-8") as json_file:
            json.dump(content_dict, json_file, default=datetime_handler)

    @validate_active
    def get(self, storage_path: str, job_id: str) -> dict:
        """
        Get the file content from the storage
        """
        # strip trailing and leading slashes from the storage_path
        storage_path = storage_path.strip("/")

        # create the full path
        file_name = job_id + ".json"
        full_json_path = os.path.join(self.base_path, storage_path, file_name)
        secure_path = os.path.normpath(full_json_path)

        # does the file already exist ?
        if not os.path.exists(secure_path):
            raise FileNotFoundError(
                f"The file {secure_path} does not exist and cannot be loaded."
            )
        with open(secure_path, "r", encoding="utf-8") as json_file:
            loaded_data_dict = json.load(json_file)
        return loaded_data_dict

    @validate_active
    def update(self, content_dict: dict, storage_path: str, job_id: str) -> None:
        """
        Update the file content.

        Args:
            content_dict: The dictionary containing the new content of the file
            storage_path: The path to the file
            job_id: The id of the job

        Returns:
            None

        Raises:
            FileNotFoundError: If the file is not found
        """
        # strip trailing and leading slashes from the storage_path
        storage_path = storage_path.strip("/")

        # json folder
        file_name = job_id + ".json"
        full_json_path = os.path.join(self.base_path, storage_path, file_name)
        secure_path = os.path.normpath(full_json_path)

        # does the file already exist ?
        if not os.path.exists(secure_path):
            raise FileNotFoundError(
                f"The file {secure_path} does not exist and cannot be updated."
            )
        with open(secure_path, "w", encoding="utf-8") as json_file:
            json.dump(content_dict, json_file, default=datetime_handler)

    @validate_active
    def move(self, start_path: str, final_path: str, job_id: str) -> None:
        """
        Move the file from `start_path` to `final_path`
        """
        start_path = start_path.strip("/")

        source_file = self.base_path + "/" + start_path + "/" + job_id + ".json"

        final_path = self.base_path + "/" + final_path + "/"
        if not os.path.exists(final_path):
            os.makedirs(final_path)

        # Move the file
        shutil.move(source_file, final_path)

    @validate_active
    def delete(self, storage_path: str, job_id: str) -> None:
        """
        Delete the file from the storage

        Args:
            storage_path: the path where the file is currently stored, but excluding the file name
            job_id: the name of the file

        Raises:
            FileNotFoundError: If the file is not found

        Returns:
            None
        """
        storage_path = storage_path.strip("/")
        source_file = self.base_path + "/" + storage_path + "/" + job_id + ".json"
        os.remove(source_file)

__init__(login_dict, name, is_active=True)

Set up the neccessary keys and create the client through which all the connections will run.

Parameters:

Name Type Description Default
login_dict LocalLoginInformation

The login dict that contains the neccessary information to connect to the local storage

required
name str

The name of the storage provider

required
is_active bool

Is the storage provider active.

True

Raises:

Type Description
ValidationError

If the login_dict is not valid

Source code in src/sqooler/storage_providers/local.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def __init__(
    self, login_dict: LocalLoginInformation, name: str, is_active: bool = True
) -> None:
    """
    Set up the neccessary keys and create the client through which all the connections will run.

    Args:
        login_dict: The login dict that contains the neccessary
                    information to connect to the local storage
        name: The name of the storage provider
        is_active: Is the storage provider active.

    Raises:
        ValidationError: If the login_dict is not valid
    """
    super().__init__(name, is_active)
    self.base_path = login_dict.base_path

delete(storage_path, job_id)

Delete the file from the storage

Parameters:

Name Type Description Default
storage_path str

the path where the file is currently stored, but excluding the file name

required
job_id str

the name of the file

required

Raises:

Type Description
FileNotFoundError

If the file is not found

Returns:

Type Description
None

None

Source code in src/sqooler/storage_providers/local.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
@validate_active
def delete(self, storage_path: str, job_id: str) -> None:
    """
    Delete the file from the storage

    Args:
        storage_path: the path where the file is currently stored, but excluding the file name
        job_id: the name of the file

    Raises:
        FileNotFoundError: If the file is not found

    Returns:
        None
    """
    storage_path = storage_path.strip("/")
    source_file = self.base_path + "/" + storage_path + "/" + job_id + ".json"
    os.remove(source_file)

get(storage_path, job_id)

Get the file content from the storage

Source code in src/sqooler/storage_providers/local.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
@validate_active
def get(self, storage_path: str, job_id: str) -> dict:
    """
    Get the file content from the storage
    """
    # strip trailing and leading slashes from the storage_path
    storage_path = storage_path.strip("/")

    # create the full path
    file_name = job_id + ".json"
    full_json_path = os.path.join(self.base_path, storage_path, file_name)
    secure_path = os.path.normpath(full_json_path)

    # does the file already exist ?
    if not os.path.exists(secure_path):
        raise FileNotFoundError(
            f"The file {secure_path} does not exist and cannot be loaded."
        )
    with open(secure_path, "r", encoding="utf-8") as json_file:
        loaded_data_dict = json.load(json_file)
    return loaded_data_dict

move(start_path, final_path, job_id)

Move the file from start_path to final_path

Source code in src/sqooler/storage_providers/local.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
@validate_active
def move(self, start_path: str, final_path: str, job_id: str) -> None:
    """
    Move the file from `start_path` to `final_path`
    """
    start_path = start_path.strip("/")

    source_file = self.base_path + "/" + start_path + "/" + job_id + ".json"

    final_path = self.base_path + "/" + final_path + "/"
    if not os.path.exists(final_path):
        os.makedirs(final_path)

    # Move the file
    shutil.move(source_file, final_path)

update(content_dict, storage_path, job_id)

Update the file content.

Parameters:

Name Type Description Default
content_dict dict

The dictionary containing the new content of the file

required
storage_path str

The path to the file

required
job_id str

The id of the job

required

Returns:

Type Description
None

None

Raises:

Type Description
FileNotFoundError

If the file is not found

Source code in src/sqooler/storage_providers/local.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
@validate_active
def update(self, content_dict: dict, storage_path: str, job_id: str) -> None:
    """
    Update the file content.

    Args:
        content_dict: The dictionary containing the new content of the file
        storage_path: The path to the file
        job_id: The id of the job

    Returns:
        None

    Raises:
        FileNotFoundError: If the file is not found
    """
    # strip trailing and leading slashes from the storage_path
    storage_path = storage_path.strip("/")

    # json folder
    file_name = job_id + ".json"
    full_json_path = os.path.join(self.base_path, storage_path, file_name)
    secure_path = os.path.normpath(full_json_path)

    # does the file already exist ?
    if not os.path.exists(secure_path):
        raise FileNotFoundError(
            f"The file {secure_path} does not exist and cannot be updated."
        )
    with open(secure_path, "w", encoding="utf-8") as json_file:
        json.dump(content_dict, json_file, default=datetime_handler)

upload(content_dict, storage_path, job_id)

Upload the file to the storage

Parameters:

Name Type Description Default
content_dict Mapping

The dictionary containing the content of the file

required
storage_path str

The path to the file

required
job_id str

The id of the job

required
Source code in src/sqooler/storage_providers/local.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@validate_active
def upload(self, content_dict: Mapping, storage_path: str, job_id: str) -> None:
    """
    Upload the file to the storage

    Args:
        content_dict: The dictionary containing the content of the file
        storage_path: The path to the file
        job_id: The id of the job
    """
    # strip trailing and leading slashes from the storage_path
    storage_path = storage_path.strip("/")

    # json folder
    folder_path = self.base_path + "/" + storage_path
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

    # create the full path
    file_name = job_id + ".json"
    full_json_path = os.path.join(folder_path, file_name)
    secure_path = os.path.normpath(full_json_path)
    # test if the file already exists and raise a warning if it does
    if os.path.exists(secure_path):
        raise FileExistsError(
            f"The file {secure_path} already exists and should not be overwritten."
        )
    with open(secure_path, "w", encoding="utf-8") as json_file:
        json.dump(content_dict, json_file, default=datetime_handler)

LocalProvider

Bases: LocalProviderExtended

Create a file storage that works on the local machine.

Source code in src/sqooler/storage_providers/local.py
597
598
599
600
601
602
603
604
605
606
class LocalProvider(LocalProviderExtended):
    """
    Create a file storage that works on the local machine.
    """

    def __init__(self, login_dict: LocalLoginInformation) -> None:
        """
        Set up the neccessary keys and create the client through which all the connections will run.
        """
        super().__init__(login_dict, name="default", is_active=True)

__init__(login_dict)

Set up the neccessary keys and create the client through which all the connections will run.

Source code in src/sqooler/storage_providers/local.py
602
603
604
605
606
def __init__(self, login_dict: LocalLoginInformation) -> None:
    """
    Set up the neccessary keys and create the client through which all the connections will run.
    """
    super().__init__(login_dict, name="default", is_active=True)

LocalProviderExtended

Bases: StorageProvider, LocalCore

Create a file storage that works on the local machine.

Attributes:

Name Type Description
configs_path PathStr

The path to the folder where the configurations are stored

queue_path PathStr

The path to the folder where the jobs are stored

running_path PathStr

The path to the folder where the running jobs are stored

finished_path PathStr

The path to the folder where the finished jobs are stored

deleted_path PathStr

The path to the folder where the deleted jobs are stored

status_path PathStr

The path to the folder where the status is stored

results_path PathStr

The path to the folder where the results are stored

pks_path PathStr

The path to the folder where the public keys are stored

Source code in src/sqooler/storage_providers/local.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
class LocalProviderExtended(StorageProvider, LocalCore):
    """
    Create a file storage that works on the local machine.

    Attributes:
        configs_path: The path to the folder where the configurations are stored
        queue_path: The path to the folder where the jobs are stored
        running_path: The path to the folder where the running jobs are stored
        finished_path: The path to the folder where the finished jobs are stored
        deleted_path: The path to the folder where the deleted jobs are stored
        status_path: The path to the folder where the status is stored
        results_path: The path to the folder where the results are stored
        pks_path: The path to the folder where the public keys are stored
    """

    configs_path: PathStr = "backends/configs"
    queue_path: PathStr = "jobs/queued"
    running_path: PathStr = "jobs/running"
    finished_path: PathStr = "jobs/finished"
    deleted_path: PathStr = "jobs/deleted"
    status_path: PathStr = "status"
    results_path: PathStr = "results"
    pks_path: PathStr = "backends/public_keys"

    def get_attribute_path(
        self,
        attribute_name: AttributePathStr,
        display_name: Optional[DisplayNameStr] = None,
        job_id: Optional[str] = None,
        username: Optional[str] = None,
    ) -> str:
        """
        Get the path to the attribute of the device.

        Args:
            display_name: The name of the backend
            attribute_name: The name of the attribute
            job_id: The job_id of the job
            username: The username of the user

        Returns:
            The path to the results of the device.
        """

        match attribute_name:
            case "configs":
                path = self.configs_path
            case "results":
                path = f"{self.results_path}/{display_name}"
            case "running":
                path = self.running_path
            case "status":
                path = f"{self.status_path}/{display_name}"
            case "queue":
                path = f"{self.queue_path}/{display_name}"
            case "deleted":
                path = self.deleted_path
            case "finished":
                path = f"{self.finished_path}/{display_name}"
            case "pks":
                path = self.pks_path
            case _:
                raise ValueError(f"The attribute name {attribute_name} is not valid.")
        return path

    def get_attribute_id(
        self,
        attribute_name: AttributeIdStr,
        job_id: str,
        display_name: Optional[DisplayNameStr] = None,
    ) -> str:
        """
        Get the path to the id of the device.

        Args:
            attribute_name: The name of the attribute
            job_id: The job_id of the job
            display_name: The name of the backend

        Returns:
            The path to the results of the device.
        """
        match attribute_name:
            case "configs":
                if display_name is None:
                    raise ValueError("The display_name is missing")
                _id = display_name
            case "job":
                _id = job_id
            case "results":
                _id = job_id
            case "status":
                _id = job_id
            case _:
                raise ValueError(f"The attribute name {attribute_name} is not valid.")
        return _id

    def get_backends(self) -> list[DisplayNameStr]:
        """
        Get a list of all the backends that the provider offers.
        """
        return self.get_file_queue(self.configs_path)

    def create_job_id(self, display_name: DisplayNameStr, username: str) -> str:
        """
        Create a job id for the job.

        Returns:
            The job id
        """
        return (uuid.uuid4().hex)[:24]

    def _delete_status(
        self, display_name: DisplayNameStr, username: str, job_id: str
    ) -> bool:
        """
        Delete a status from the storage. This is only intended for test purposes.

        Args:
            display_name: The name of the backend to which we want to upload the job
            username: The username of the user that is uploading the job
            job_id: The job_id of the job that we want to upload the status for

        Raises:
            FileNotFoundError: If the status does not exist.

        Returns:
            Success if the file was deleted successfully
        """
        status_json_dir = self.get_attribute_path("status", display_name)

        self.delete(storage_path=status_json_dir, job_id=job_id)
        return True

    def _delete_result(self, display_name: DisplayNameStr, job_id: str) -> bool:
        """
        Delete a result from the storage. This is only intended for test purposes.

        Args:
            display_name: The name of the backend to which we want to upload the job
            username: The username of the user that is uploading the job
            job_id: The job_id of the job that we want to upload the status for

        Raises:
            FileNotFoundError: If the result does not exist.

        Returns:
            Success if the file was deleted successfully
        """

        result_json_dir = self.get_attribute_path("results", display_name, job_id)
        self.delete(storage_path=result_json_dir, job_id=job_id)
        return True

    def update_config(
        self,
        config_dict: BackendConfigSchemaIn,
        display_name: DisplayNameStr,
        private_jwk: Optional[JWK] = None,
    ) -> None:
        """
        The function that updates the spooler configuration on the storage.

        Args:
            config_dict: The dictionary containing the configuration
            display_name : The name of the backend
            private_jwk: The private key of the backend

        Returns:
            None
        """

        config_dict = self._verify_config(config_dict, display_name)
        # path of the configs
        config_path = os.path.join(self.base_path, self.configs_path)
        config_path = os.path.normpath(config_path)

        file_name = display_name + ".json"
        full_json_path = os.path.join(config_path, file_name)
        secure_path = os.path.normpath(full_json_path)

        # check if the file already exists
        if not os.path.exists(secure_path):
            raise FileNotFoundError(
                (
                    f"The file {secure_path} does not exist and should not be updated."
                    "Use the upload_config method instead."
                )
            )

        # now read the old config
        with open(secure_path, "r", encoding="utf-8") as json_file:
            old_config_jws = json.load(json_file)

        upload_dict = self._format_update_config(
            old_config_jws, config_dict, private_jwk
        )

        self.update(
            content_dict=upload_dict,
            storage_path=self.configs_path,
            job_id=display_name,
        )

    def get_config(self, display_name: DisplayNameStr) -> BackendConfigSchemaIn:
        """
        The function that downloads the spooler configuration to the storage.

        Args:
            display_name : The name of the backend

        Raises:
            FileNotFoundError: If the backend does not exist

        Returns:
            The configuration of the backend in complete form.
        """
        # path of the configs
        backend_config_dict = self.get(self.configs_path, job_id=display_name)
        typed_config = self._adapt_get_config(backend_config_dict)
        return typed_config

    def _delete_config(self, display_name: DisplayNameStr) -> bool:
        """
        Delete a config from the storage. This is only intended for test purposes.

        Args:
            display_name: The name of the backend to which we want to upload the job

        Raises:
            FileNotFoundError: If the status does not exist.

        Returns:
            Success if the file was deleted successfully
        """

        self.delete(storage_path=self.configs_path, job_id=display_name)
        return True

    def upload_public_key(
        self, public_jwk: JWK, display_name: DisplayNameStr, role: PksStr = "backend"
    ) -> None:
        """
        The function that uploads the spooler public JWK to the storage.

        Args:
            public_jwk: The JWK that contains the public key
            display_name : The name of the backend
            role: The role of the public key

        Returns:
            None
        """
        # first make sure that the public key is intended for verification
        if not public_jwk.key_ops == "verify":
            raise ValueError("The key is not intended for verification")

        # make sure that the key does not contain a private key
        if public_jwk.d is not None:
            raise ValueError("The key contains a private key")

        # make sure that the key has the correct kid for the backend
        if role == "backend":
            config_dict = self.get_config(display_name)
            if public_jwk.kid != config_dict.kid:
                raise ValueError("The key does not have the correct kid.")

        # path of the public keys
        pks_path = self.get_attribute_path("pks")
        key_path = os.path.join(self.base_path, pks_path)
        key_path = os.path.normpath(key_path)

        # test if the key path already exists. If it does not, create it
        if not os.path.exists(key_path):
            os.makedirs(key_path)

        # this should most likely depend on the kid at some point
        file_name = f"{public_jwk.kid}.json"
        full_json_path = os.path.join(key_path, file_name)
        secure_path = os.path.normpath(full_json_path)
        with open(secure_path, "w", encoding="utf-8") as json_file:
            json_file.write(public_jwk.model_dump_json())

    def get_public_key_from_kid(self, kid: str) -> JWK:
        """
        The function that gets public JWK based on the key id.

        Args:
            kid : The key id of the backend

        Returns:
            JWk : The public JWK object
        """
        pks_path = self.get_attribute_path("pks")
        key_path = os.path.join(self.base_path, pks_path)
        file_name = f"{kid}.json"

        validate_filename(file_name)
        full_json_path = os.path.join(key_path, file_name)
        secure_path = os.path.normpath(full_json_path)
        with open(secure_path, "r", encoding="utf-8") as json_file:
            public_key_dict = json.load(json_file)

        if not public_key_dict:
            raise FileNotFoundError("The backend does not exist for the given storage.")

        return JWK(**public_key_dict)

    def _delete_public_key(self, kid: str) -> bool:
        """
        Delete a public key from the storage. This is only intended for test purposes.

        Args:
            kid: The key id of the public key

        Raises:
            FileNotFoundError: If the status does not exist.

        Returns:
            Success if the file was deleted successfully
        """
        pks_path = self.get_attribute_path("pks")
        self.delete(storage_path=pks_path, job_id=kid)
        return True

    def update_in_database(
        self,
        result_dict: ResultDict,
        status_msg_dict: StatusMsgDict,
        job_id: str,
        display_name: DisplayNameStr,
        private_jwk: Optional[JWK] = None,
    ) -> None:
        """
        Upload the status and result to the `StorageProvider`.

        Args:
            result_dict: the dictionary containing the result of the job
            status_msg_dict: the dictionary containing the status message of the job
            job_id: the name of the job
            display_name: the name of the backend
            private_jwk: the private key of the backend

        Returns:
            None
        """

        # this is an ugly hack to get the username
        if job_id.startswith("job-"):
            extracted_username = job_id.split("-")[2]
        else:
            extracted_username = None

        status_json_dir = self.get_attribute_path(
            "status", display_name, extracted_username
        )
        job_json_start_dir = self.get_attribute_path("running")

        status_json_name = self.get_attribute_id("status", job_id=job_id)
        job_json_name = self.get_attribute_id("job", job_id)

        # check if the job is done or had an error
        if status_msg_dict.status == "DONE":
            # test if the result dict is None
            if result_dict is None:
                raise ValueError(
                    "The 'result_dict' argument cannot be None if the job is done."
                )
            result_uploaded = self.upload_result(
                result_dict, display_name, job_id, private_jwk
            )
            if not result_uploaded:
                raise ValueError("The result was not uploaded successfully.")

            # now move the job out of the running jobs into the finished jobs
            job_finished_json_dir = self.get_attribute_path(
                "finished", display_name=display_name
            )

            self.move(job_json_start_dir, job_finished_json_dir, job_json_name)

        elif status_msg_dict.status == "ERROR":
            # because there was an error, we move the job to the deleted jobs
            deleted_json_dir = self.get_attribute_path("deleted", display_name)
            self.move(job_json_start_dir, deleted_json_dir, job_json_name)

        # and create the status json file
        try:
            self.update(status_msg_dict.model_dump(), status_json_dir, status_json_name)
        except FileNotFoundError:
            logging.warning(
                "The status file was missing for %s with job_id %s was missing.",
                display_name,
                job_id,
            )
            self.upload_status(display_name, "", status_json_name)
            self.update(status_msg_dict.model_dump(), status_json_dir, status_json_name)

    def get_file_queue(self, storage_path: str) -> list[str]:
        """
        Get a list of files. Only json files are considered. And the ending of
        the file is removed.

        Args:
            storage_path: Where are we looking for the files.

        Returns:
            A list of files that was found.
        """
        # get a list of files in the folder
        full_path = self.base_path + "/" + storage_path
        # test if the path exists. Otherwise simply return an empty list
        if not os.path.exists(full_path):
            return []

        all_items = os.listdir(full_path)
        # Filter out only the JSON files
        json_files = [item for item in all_items if item.endswith(".json")]

        # Get the backend names
        names = [os.path.splitext(file_name)[0] for file_name in json_files]

        return names

create_job_id(display_name, username)

Create a job id for the job.

Returns:

Type Description
str

The job id

Source code in src/sqooler/storage_providers/local.py
275
276
277
278
279
280
281
282
def create_job_id(self, display_name: DisplayNameStr, username: str) -> str:
    """
    Create a job id for the job.

    Returns:
        The job id
    """
    return (uuid.uuid4().hex)[:24]

get_attribute_id(attribute_name, job_id, display_name=None)

Get the path to the id of the device.

Parameters:

Name Type Description Default
attribute_name AttributeIdStr

The name of the attribute

required
job_id str

The job_id of the job

required
display_name Optional[DisplayNameStr]

The name of the backend

None

Returns:

Type Description
str

The path to the results of the device.

Source code in src/sqooler/storage_providers/local.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def get_attribute_id(
    self,
    attribute_name: AttributeIdStr,
    job_id: str,
    display_name: Optional[DisplayNameStr] = None,
) -> str:
    """
    Get the path to the id of the device.

    Args:
        attribute_name: The name of the attribute
        job_id: The job_id of the job
        display_name: The name of the backend

    Returns:
        The path to the results of the device.
    """
    match attribute_name:
        case "configs":
            if display_name is None:
                raise ValueError("The display_name is missing")
            _id = display_name
        case "job":
            _id = job_id
        case "results":
            _id = job_id
        case "status":
            _id = job_id
        case _:
            raise ValueError(f"The attribute name {attribute_name} is not valid.")
    return _id

get_attribute_path(attribute_name, display_name=None, job_id=None, username=None)

Get the path to the attribute of the device.

Parameters:

Name Type Description Default
display_name Optional[DisplayNameStr]

The name of the backend

None
attribute_name AttributePathStr

The name of the attribute

required
job_id Optional[str]

The job_id of the job

None
username Optional[str]

The username of the user

None

Returns:

Type Description
str

The path to the results of the device.

Source code in src/sqooler/storage_providers/local.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def get_attribute_path(
    self,
    attribute_name: AttributePathStr,
    display_name: Optional[DisplayNameStr] = None,
    job_id: Optional[str] = None,
    username: Optional[str] = None,
) -> str:
    """
    Get the path to the attribute of the device.

    Args:
        display_name: The name of the backend
        attribute_name: The name of the attribute
        job_id: The job_id of the job
        username: The username of the user

    Returns:
        The path to the results of the device.
    """

    match attribute_name:
        case "configs":
            path = self.configs_path
        case "results":
            path = f"{self.results_path}/{display_name}"
        case "running":
            path = self.running_path
        case "status":
            path = f"{self.status_path}/{display_name}"
        case "queue":
            path = f"{self.queue_path}/{display_name}"
        case "deleted":
            path = self.deleted_path
        case "finished":
            path = f"{self.finished_path}/{display_name}"
        case "pks":
            path = self.pks_path
        case _:
            raise ValueError(f"The attribute name {attribute_name} is not valid.")
    return path

get_backends()

Get a list of all the backends that the provider offers.

Source code in src/sqooler/storage_providers/local.py
269
270
271
272
273
def get_backends(self) -> list[DisplayNameStr]:
    """
    Get a list of all the backends that the provider offers.
    """
    return self.get_file_queue(self.configs_path)

get_config(display_name)

The function that downloads the spooler configuration to the storage.

Parameters:

Name Type Description Default
display_name

The name of the backend

required

Raises:

Type Description
FileNotFoundError

If the backend does not exist

Returns:

Type Description
BackendConfigSchemaIn

The configuration of the backend in complete form.

Source code in src/sqooler/storage_providers/local.py
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
def get_config(self, display_name: DisplayNameStr) -> BackendConfigSchemaIn:
    """
    The function that downloads the spooler configuration to the storage.

    Args:
        display_name : The name of the backend

    Raises:
        FileNotFoundError: If the backend does not exist

    Returns:
        The configuration of the backend in complete form.
    """
    # path of the configs
    backend_config_dict = self.get(self.configs_path, job_id=display_name)
    typed_config = self._adapt_get_config(backend_config_dict)
    return typed_config

get_file_queue(storage_path)

Get a list of files. Only json files are considered. And the ending of the file is removed.

Parameters:

Name Type Description Default
storage_path str

Where are we looking for the files.

required

Returns:

Type Description
list[str]

A list of files that was found.

Source code in src/sqooler/storage_providers/local.py
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
def get_file_queue(self, storage_path: str) -> list[str]:
    """
    Get a list of files. Only json files are considered. And the ending of
    the file is removed.

    Args:
        storage_path: Where are we looking for the files.

    Returns:
        A list of files that was found.
    """
    # get a list of files in the folder
    full_path = self.base_path + "/" + storage_path
    # test if the path exists. Otherwise simply return an empty list
    if not os.path.exists(full_path):
        return []

    all_items = os.listdir(full_path)
    # Filter out only the JSON files
    json_files = [item for item in all_items if item.endswith(".json")]

    # Get the backend names
    names = [os.path.splitext(file_name)[0] for file_name in json_files]

    return names

get_public_key_from_kid(kid)

The function that gets public JWK based on the key id.

Parameters:

Name Type Description Default
kid

The key id of the backend

required

Returns:

Name Type Description
JWk JWK

The public JWK object

Source code in src/sqooler/storage_providers/local.py
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
def get_public_key_from_kid(self, kid: str) -> JWK:
    """
    The function that gets public JWK based on the key id.

    Args:
        kid : The key id of the backend

    Returns:
        JWk : The public JWK object
    """
    pks_path = self.get_attribute_path("pks")
    key_path = os.path.join(self.base_path, pks_path)
    file_name = f"{kid}.json"

    validate_filename(file_name)
    full_json_path = os.path.join(key_path, file_name)
    secure_path = os.path.normpath(full_json_path)
    with open(secure_path, "r", encoding="utf-8") as json_file:
        public_key_dict = json.load(json_file)

    if not public_key_dict:
        raise FileNotFoundError("The backend does not exist for the given storage.")

    return JWK(**public_key_dict)

update_config(config_dict, display_name, private_jwk=None)

The function that updates the spooler configuration on the storage.

Parameters:

Name Type Description Default
config_dict BackendConfigSchemaIn

The dictionary containing the configuration

required
display_name

The name of the backend

required
private_jwk Optional[JWK]

The private key of the backend

None

Returns:

Type Description
None

None

Source code in src/sqooler/storage_providers/local.py
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
def update_config(
    self,
    config_dict: BackendConfigSchemaIn,
    display_name: DisplayNameStr,
    private_jwk: Optional[JWK] = None,
) -> None:
    """
    The function that updates the spooler configuration on the storage.

    Args:
        config_dict: The dictionary containing the configuration
        display_name : The name of the backend
        private_jwk: The private key of the backend

    Returns:
        None
    """

    config_dict = self._verify_config(config_dict, display_name)
    # path of the configs
    config_path = os.path.join(self.base_path, self.configs_path)
    config_path = os.path.normpath(config_path)

    file_name = display_name + ".json"
    full_json_path = os.path.join(config_path, file_name)
    secure_path = os.path.normpath(full_json_path)

    # check if the file already exists
    if not os.path.exists(secure_path):
        raise FileNotFoundError(
            (
                f"The file {secure_path} does not exist and should not be updated."
                "Use the upload_config method instead."
            )
        )

    # now read the old config
    with open(secure_path, "r", encoding="utf-8") as json_file:
        old_config_jws = json.load(json_file)

    upload_dict = self._format_update_config(
        old_config_jws, config_dict, private_jwk
    )

    self.update(
        content_dict=upload_dict,
        storage_path=self.configs_path,
        job_id=display_name,
    )

update_in_database(result_dict, status_msg_dict, job_id, display_name, private_jwk=None)

Upload the status and result to the StorageProvider.

Parameters:

Name Type Description Default
result_dict ResultDict

the dictionary containing the result of the job

required
status_msg_dict StatusMsgDict

the dictionary containing the status message of the job

required
job_id str

the name of the job

required
display_name DisplayNameStr

the name of the backend

required
private_jwk Optional[JWK]

the private key of the backend

None

Returns:

Type Description
None

None

Source code in src/sqooler/storage_providers/local.py
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
def update_in_database(
    self,
    result_dict: ResultDict,
    status_msg_dict: StatusMsgDict,
    job_id: str,
    display_name: DisplayNameStr,
    private_jwk: Optional[JWK] = None,
) -> None:
    """
    Upload the status and result to the `StorageProvider`.

    Args:
        result_dict: the dictionary containing the result of the job
        status_msg_dict: the dictionary containing the status message of the job
        job_id: the name of the job
        display_name: the name of the backend
        private_jwk: the private key of the backend

    Returns:
        None
    """

    # this is an ugly hack to get the username
    if job_id.startswith("job-"):
        extracted_username = job_id.split("-")[2]
    else:
        extracted_username = None

    status_json_dir = self.get_attribute_path(
        "status", display_name, extracted_username
    )
    job_json_start_dir = self.get_attribute_path("running")

    status_json_name = self.get_attribute_id("status", job_id=job_id)
    job_json_name = self.get_attribute_id("job", job_id)

    # check if the job is done or had an error
    if status_msg_dict.status == "DONE":
        # test if the result dict is None
        if result_dict is None:
            raise ValueError(
                "The 'result_dict' argument cannot be None if the job is done."
            )
        result_uploaded = self.upload_result(
            result_dict, display_name, job_id, private_jwk
        )
        if not result_uploaded:
            raise ValueError("The result was not uploaded successfully.")

        # now move the job out of the running jobs into the finished jobs
        job_finished_json_dir = self.get_attribute_path(
            "finished", display_name=display_name
        )

        self.move(job_json_start_dir, job_finished_json_dir, job_json_name)

    elif status_msg_dict.status == "ERROR":
        # because there was an error, we move the job to the deleted jobs
        deleted_json_dir = self.get_attribute_path("deleted", display_name)
        self.move(job_json_start_dir, deleted_json_dir, job_json_name)

    # and create the status json file
    try:
        self.update(status_msg_dict.model_dump(), status_json_dir, status_json_name)
    except FileNotFoundError:
        logging.warning(
            "The status file was missing for %s with job_id %s was missing.",
            display_name,
            job_id,
        )
        self.upload_status(display_name, "", status_json_name)
        self.update(status_msg_dict.model_dump(), status_json_dir, status_json_name)

upload_public_key(public_jwk, display_name, role='backend')

The function that uploads the spooler public JWK to the storage.

Parameters:

Name Type Description Default
public_jwk JWK

The JWK that contains the public key

required
display_name

The name of the backend

required
role PksStr

The role of the public key

'backend'

Returns:

Type Description
None

None

Source code in src/sqooler/storage_providers/local.py
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
def upload_public_key(
    self, public_jwk: JWK, display_name: DisplayNameStr, role: PksStr = "backend"
) -> None:
    """
    The function that uploads the spooler public JWK to the storage.

    Args:
        public_jwk: The JWK that contains the public key
        display_name : The name of the backend
        role: The role of the public key

    Returns:
        None
    """
    # first make sure that the public key is intended for verification
    if not public_jwk.key_ops == "verify":
        raise ValueError("The key is not intended for verification")

    # make sure that the key does not contain a private key
    if public_jwk.d is not None:
        raise ValueError("The key contains a private key")

    # make sure that the key has the correct kid for the backend
    if role == "backend":
        config_dict = self.get_config(display_name)
        if public_jwk.kid != config_dict.kid:
            raise ValueError("The key does not have the correct kid.")

    # path of the public keys
    pks_path = self.get_attribute_path("pks")
    key_path = os.path.join(self.base_path, pks_path)
    key_path = os.path.normpath(key_path)

    # test if the key path already exists. If it does not, create it
    if not os.path.exists(key_path):
        os.makedirs(key_path)

    # this should most likely depend on the kid at some point
    file_name = f"{public_jwk.kid}.json"
    full_json_path = os.path.join(key_path, file_name)
    secure_path = os.path.normpath(full_json_path)
    with open(secure_path, "w", encoding="utf-8") as json_file:
        json_file.write(public_jwk.model_dump_json())

Comments