Skip to content

API documentation of the local storage provider

The module that contains all the necessary logic for communication with the local storage providers.

LocalProvider

Bases: LocalProviderExtended

Create a file storage that works on the local machine.

Source code in src/sqooler/storage_providers/local.py
506
507
508
509
510
511
512
513
514
515
class LocalProvider(LocalProviderExtended):
    """
    Create a file storage that works on the local machine.
    """

    def __init__(self, login_dict: LocalLoginInformation) -> None:
        """
        Set up the neccessary keys and create the client through which all the connections will run.
        """
        super().__init__(login_dict, name="default", is_active=True)

__init__(login_dict)

Set up the neccessary keys and create the client through which all the connections will run.

Source code in src/sqooler/storage_providers/local.py
511
512
513
514
515
def __init__(self, login_dict: LocalLoginInformation) -> None:
    """
    Set up the neccessary keys and create the client through which all the connections will run.
    """
    super().__init__(login_dict, name="default", is_active=True)

LocalProviderExtended

Bases: StorageProvider

Create a file storage that works on the local machine.

Source code in src/sqooler/storage_providers/local.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
class LocalProviderExtended(StorageProvider):
    """
    Create a file storage that works on the local machine.
    """

    def __init__(
        self, login_dict: LocalLoginInformation, name: str, is_active: bool = True
    ) -> None:
        """
        Set up the neccessary keys and create the client through which all the connections will run.

        Args:
            login_dict: The login dict that contains the neccessary
                        information to connect to the local storage
            name: The name of the storage provider
            is_active: Is the storage provider active.

        Raises:
            ValidationError: If the login_dict is not valid
        """
        super().__init__(name, is_active)
        self.base_path = login_dict.base_path

    @validate_active
    def upload(self, content_dict: Mapping, storage_path: str, job_id: str) -> None:
        """
        Upload the file to the storage
        """
        # strip trailing and leading slashes from the storage_path
        storage_path = storage_path.strip("/")

        # json folder
        folder_path = self.base_path + "/" + storage_path
        if not os.path.exists(folder_path):
            os.makedirs(folder_path)

        # create the full path
        file_name = job_id + ".json"
        full_json_path = os.path.join(folder_path, file_name)
        secure_path = os.path.normpath(full_json_path)

        with open(secure_path, "w", encoding="utf-8") as json_file:
            json.dump(content_dict, json_file)

    @validate_active
    def get_file_content(self, storage_path: str, job_id: str) -> dict:
        """
        Get the file content from the storage
        """
        # strip trailing and leading slashes from the storage_path
        storage_path = storage_path.strip("/")

        # create the full path
        file_name = job_id + ".json"
        full_json_path = os.path.join(self.base_path, storage_path, file_name)
        secure_path = os.path.normpath(full_json_path)

        # does the file already exist ?
        if not os.path.exists(secure_path):
            raise FileNotFoundError(
                f"The file {secure_path} does not exist and cannot be loaded."
            )
        with open(secure_path, "r", encoding="utf-8") as json_file:
            loaded_data_dict = json.load(json_file)
        return loaded_data_dict

    def get_job_content(self, storage_path: str, job_id: str) -> dict:
        """
        Get the content of the job from the storage. This is a wrapper around get_file_content
        and and handles the different ways of identifiying the job.

        storage_path: the path towards the file, excluding the filename / id
        job_id: the id of the file we are about to look up

        Returns:
            The content of the job
        """
        job_dict = self.get_file_content(storage_path=storage_path, job_id=job_id)
        return job_dict

    def update_file(self, content_dict: dict, storage_path: str, job_id: str) -> None:
        """
        Update the file content.

        Args:
            content_dict: The dictionary containing the new content of the file
            storage_path: The path to the file
            job_id: The id of the job

        Returns:
            None

        Raises:
            FileNotFoundError: If the file is not found
        """
        # strip trailing and leading slashes from the storage_path
        storage_path = storage_path.strip("/")

        # json folder
        file_name = job_id + ".json"
        full_json_path = os.path.join(self.base_path, storage_path, file_name)
        secure_path = os.path.normpath(full_json_path)

        # does the file already exist ?
        if not os.path.exists(secure_path):
            raise FileNotFoundError(
                f"The file {secure_path} does not exist and cannot be updated."
            )
        with open(secure_path, "w", encoding="utf-8") as json_file:
            json.dump(content_dict, json_file)

    @validate_active
    def move_file(self, start_path: str, final_path: str, job_id: str) -> None:
        """
        Move the file from `start_path` to `final_path`
        """
        start_path = start_path.strip("/")

        source_file = self.base_path + "/" + start_path + "/" + job_id + ".json"

        final_path = self.base_path + "/" + final_path + "/"
        if not os.path.exists(final_path):
            os.makedirs(final_path)

        # Move the file
        shutil.move(source_file, final_path)

    @validate_active
    def delete_file(self, storage_path: str, job_id: str) -> None:
        """
        Delete the file from the storage

        Args:
            storage_path: the path where the file is currently stored, but excluding the file name
            job_id: the name of the file

        Returns:
            None
        """
        storage_path = storage_path.strip("/")
        source_file = self.base_path + "/" + storage_path + "/" + job_id + ".json"
        os.remove(source_file)

    @validate_active
    def get_backends(self) -> list[DisplayNameStr]:
        """
        Get a list of all the backends that the provider offers.
        """
        # path of the configs
        config_path = self.base_path + "/backends/configs"
        backend_names: list[DisplayNameStr] = []

        # If the folder does not exist, return an empty list
        if not os.path.exists(config_path):
            return backend_names

        # Get a list of all items in the folder
        all_items = os.listdir(config_path)
        # Filter out only the JSON files
        json_files = [item for item in all_items if item.endswith(".json")]

        for file_name in json_files:
            full_json_path = os.path.join(config_path, file_name)
            secure_path = os.path.normpath(full_json_path)

            with open(secure_path, "r", encoding="utf-8") as json_file:
                config_dict = json.load(json_file)
                backend_names.append(config_dict["display_name"])
        return backend_names

    def get_backend_status(
        self, display_name: DisplayNameStr
    ) -> BackendStatusSchemaOut:
        """
        Get the status of the backend. This follows the qiskit logic.

        Args:
            display_name: The name of the backend

        Returns:
            The status dict of the backend

        Raises:
            FileNotFoundError: If the backend does not exist
        """
        # path of the configs
        file_name = display_name + ".json"
        config_path = self.base_path + "/backends/configs"
        full_json_path = os.path.join(config_path, file_name)
        secure_path = os.path.normpath(full_json_path)
        with open(secure_path, "r", encoding="utf-8") as json_file:
            backend_config_dict = json.load(json_file)

        if not backend_config_dict:
            raise FileNotFoundError(
                f"The backend {display_name} does not exist for the given storageprovider."
            )

        backend_config_info = BackendConfigSchemaIn(**backend_config_dict)
        qiskit_backend_dict = self.backend_dict_to_qiskit_status(backend_config_info)
        return qiskit_backend_dict

    def upload_job(
        self, job_dict: dict, display_name: DisplayNameStr, username: str
    ) -> str:
        """
        Upload the job to the storage provider.

        Args:
            job_dict: the full job dict
            display_name: the name of the backend
            username: the name of the user that submitted the job

        Returns:
            The job id of the uploaded job.
        """

        storage_path = "jobs/queued/" + display_name
        job_id = (uuid.uuid4().hex)[:24]

        self.upload(content_dict=job_dict, storage_path=storage_path, job_id=job_id)
        return job_id

    def upload_status(
        self, display_name: DisplayNameStr, username: str, job_id: str
    ) -> StatusMsgDict:
        """
        This function uploads a status file to the backend and creates the status dict.

        Args:
            display_name: The name of the backend to which we want to upload the job
            username: The username of the user that is uploading the job
            job_id: The job_id of the job that we want to upload the status for

        Returns:
            The status dict of the job
        """
        storage_path = "status/" + display_name
        status_draft = {
            "job_id": job_id,
            "status": "INITIALIZING",
            "detail": "Got your json.",
            "error_message": "None",
        }

        # should we also upload the username into the dict ?
        status_dict = StatusMsgDict(**status_draft)
        # now upload the status dict
        self.upload(
            content_dict=status_dict.model_dump(),
            storage_path=storage_path,
            job_id=job_id,
        )
        return status_dict

    def get_status(
        self, display_name: DisplayNameStr, username: str, job_id: str
    ) -> StatusMsgDict:
        """
        This function gets the status file from the backend and returns the status dict.

        Args:
            display_name: The name of the backend to which we want to upload the job
            username: The username of the user that is uploading the job
            job_id: The job_id of the job that we want to upload the status for

        Returns:
            The status dict of the job
        """
        status_json_dir = "status/" + display_name

        try:
            status_dict = self.get_file_content(
                storage_path=status_json_dir, job_id=job_id
            )
            return StatusMsgDict(**status_dict)
        except FileNotFoundError:
            # if the job_id is not valid, we return an error
            return StatusMsgDict(
                job_id=job_id,
                status="ERROR",
                detail="Cannot get status",
                error_message=f"Could not find status for {display_name} with job_id {job_id}.",
            )

    def get_result(
        self, display_name: DisplayNameStr, username: str, job_id: str
    ) -> ResultDict:
        """
        This function gets the result file from the backend and returns the result dict.

        Args:
            display_name: The name of the backend to which we want to upload the job
            username: The username of the user that is uploading the job
            job_id: The job_id of the job that we want to upload the status for

        Returns:
            The result dict of the job. If the information is not available, the result dict
            has a status of "ERROR".
        """

        try:
            backend_config_info = self.get_backend_dict(display_name)
        except FileNotFoundError:
            # if the backend does not exist, we return an error
            return ResultDict(
                display_name="",
                backend_version="",
                job_id=job_id,
                qobj_id=None,
                success=False,
                status="ERROR",
                header={},
                results=[],
            )

        result_json_dir = "results/" + display_name
        try:
            result_dict = self.get_file_content(
                storage_path=result_json_dir, job_id=job_id
            )
        except FileNotFoundError:
            # if the job_id is not valid, we return an error
            return ResultDict(
                display_name=display_name,
                backend_version="",
                job_id=job_id,
                qobj_id=None,
                success=False,
                status="ERROR",
                header={},
                results=[],
            )
        result_dict["backend_name"] = backend_config_info.backend_name
        typed_result = ResultDict(**result_dict)
        return typed_result

    def upload_config(
        self, config_dict: BackendConfigSchemaIn, display_name: DisplayNameStr
    ) -> None:
        """
        The function that uploads the spooler configuration to the storage.

        Args:
            config_dict: The dictionary containing the configuration
            display_name : The name of the backend

        Returns:
            None
        """
        # path of the configs
        config_path = os.path.join(self.base_path, "backends/configs")
        config_path = os.path.normpath(os.path.join(self.base_path, "backends/configs"))
        # test if the config path already exists. If it does not, create it
        if not os.path.exists(config_path):
            os.makedirs(config_path)

        file_name = display_name + ".json"
        full_json_path = os.path.join(config_path, file_name)
        secure_path = os.path.normpath(full_json_path)
        with open(secure_path, "w", encoding="utf-8") as json_file:
            json_file.write(config_dict.model_dump_json())

    def get_config(self, display_name: DisplayNameStr) -> BackendConfigSchemaIn:
        """
        The function that downloads the spooler configuration to the storage.

        Args:
            display_name : The name of the backend

        Raises:
            FileNotFoundError: If the backend does not exist

        Returns:
            The configuration of the backend in complete form.
        """
        # path of the configs
        config_path = self.base_path + "/backends/configs"
        file_name = display_name + ".json"
        full_json_path = os.path.join(config_path, file_name)
        secure_path = os.path.normpath(full_json_path)
        with open(secure_path, "r", encoding="utf-8") as json_file:
            backend_config_dict = json.load(json_file)

        if not backend_config_dict:
            raise FileNotFoundError("The backend does not exist for the given storage.")

        return BackendConfigSchemaIn(**backend_config_dict)

    def update_in_database(
        self,
        result_dict: ResultDict,
        status_msg_dict: StatusMsgDict,
        job_id: str,
        display_name: DisplayNameStr,
    ) -> None:
        """
        Upload the status and result to the `StorageProvider`.

        Args:
            result_dict: the dictionary containing the result of the job
            status_msg_dict: the dictionary containing the status message of the job
            job_id: the name of the job
            display_name: the name of the backend

        Returns:
            None
        """
        job_json_start_dir = "jobs/running"
        # check if the job is done or had an error
        if status_msg_dict.status == "DONE":
            # test if the result dict is None
            if result_dict is None:
                raise ValueError(
                    "The 'result_dict' argument cannot be None if the job is done."
                )
            # let us create the result json file
            result_json_dir = "results/" + display_name
            self.upload(result_dict.model_dump(), result_json_dir, job_id)

            # now move the job out of the running jobs into the finished jobs
            job_finished_json_dir = "jobs/finished/" + display_name
            self.move_file(job_json_start_dir, job_finished_json_dir, job_id)

        elif status_msg_dict.status == "ERROR":
            # because there was an error, we move the job to the deleted jobs
            deleted_json_dir = "jobs/deleted"
            self.move_file(job_json_start_dir, deleted_json_dir, job_id)

        # and create the status json file
        status_json_dir = "status/" + display_name
        self.update_file(status_msg_dict.model_dump(), status_json_dir, job_id)

    def get_file_queue(self, storage_path: str) -> list[str]:
        """
        Get a list of files

        Args:
            storage_path: Where are we looking for the files.

        Returns:
            A list of files that was found.
        """
        # get a list of files in the folder
        full_path = self.base_path + "/" + storage_path
        # test if the path exists. Otherwise simply return an empty list
        if not os.path.exists(full_path):
            return []
        return os.listdir(full_path)

    def get_next_job_in_queue(self, display_name: str) -> NextJobSchema:
        """
        A function that obtains the next job in the queue.

        Args:
            display_name: The name of the backend

        Returns:
            the dict of the next job
        """
        queue_dir = "jobs/queued/" + display_name
        job_dict = {"job_id": 0, "job_json_path": "None"}
        job_list = self.get_file_queue(queue_dir)

        # update the time stamp of the last job
        self.timestamp_queue(display_name)

        # if there is a job, we should move it
        if job_list:
            job_json_name = job_list[0]
            job_id = job_json_name[:-5]
            job_dict["job_id"] = job_id

            # and move the file into the right directory
            self.move_file(queue_dir, "jobs/running", job_id)
            job_dict["job_json_path"] = "jobs/running"
        return NextJobSchema(**job_dict)

__init__(login_dict, name, is_active=True)

Set up the neccessary keys and create the client through which all the connections will run.

Parameters:

Name Type Description Default
login_dict LocalLoginInformation

The login dict that contains the neccessary information to connect to the local storage

required
name str

The name of the storage provider

required
is_active bool

Is the storage provider active.

True

Raises:

Type Description
ValidationError

If the login_dict is not valid

Source code in src/sqooler/storage_providers/local.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def __init__(
    self, login_dict: LocalLoginInformation, name: str, is_active: bool = True
) -> None:
    """
    Set up the neccessary keys and create the client through which all the connections will run.

    Args:
        login_dict: The login dict that contains the neccessary
                    information to connect to the local storage
        name: The name of the storage provider
        is_active: Is the storage provider active.

    Raises:
        ValidationError: If the login_dict is not valid
    """
    super().__init__(name, is_active)
    self.base_path = login_dict.base_path

delete_file(storage_path, job_id)

Delete the file from the storage

Parameters:

Name Type Description Default
storage_path str

the path where the file is currently stored, but excluding the file name

required
job_id str

the name of the file

required

Returns:

Type Description
None

None

Source code in src/sqooler/storage_providers/local.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
@validate_active
def delete_file(self, storage_path: str, job_id: str) -> None:
    """
    Delete the file from the storage

    Args:
        storage_path: the path where the file is currently stored, but excluding the file name
        job_id: the name of the file

    Returns:
        None
    """
    storage_path = storage_path.strip("/")
    source_file = self.base_path + "/" + storage_path + "/" + job_id + ".json"
    os.remove(source_file)

get_backend_status(display_name)

Get the status of the backend. This follows the qiskit logic.

Parameters:

Name Type Description Default
display_name DisplayNameStr

The name of the backend

required

Returns:

Type Description
BackendStatusSchemaOut

The status dict of the backend

Raises:

Type Description
FileNotFoundError

If the backend does not exist

Source code in src/sqooler/storage_providers/local.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def get_backend_status(
    self, display_name: DisplayNameStr
) -> BackendStatusSchemaOut:
    """
    Get the status of the backend. This follows the qiskit logic.

    Args:
        display_name: The name of the backend

    Returns:
        The status dict of the backend

    Raises:
        FileNotFoundError: If the backend does not exist
    """
    # path of the configs
    file_name = display_name + ".json"
    config_path = self.base_path + "/backends/configs"
    full_json_path = os.path.join(config_path, file_name)
    secure_path = os.path.normpath(full_json_path)
    with open(secure_path, "r", encoding="utf-8") as json_file:
        backend_config_dict = json.load(json_file)

    if not backend_config_dict:
        raise FileNotFoundError(
            f"The backend {display_name} does not exist for the given storageprovider."
        )

    backend_config_info = BackendConfigSchemaIn(**backend_config_dict)
    qiskit_backend_dict = self.backend_dict_to_qiskit_status(backend_config_info)
    return qiskit_backend_dict

get_backends()

Get a list of all the backends that the provider offers.

Source code in src/sqooler/storage_providers/local.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
@validate_active
def get_backends(self) -> list[DisplayNameStr]:
    """
    Get a list of all the backends that the provider offers.
    """
    # path of the configs
    config_path = self.base_path + "/backends/configs"
    backend_names: list[DisplayNameStr] = []

    # If the folder does not exist, return an empty list
    if not os.path.exists(config_path):
        return backend_names

    # Get a list of all items in the folder
    all_items = os.listdir(config_path)
    # Filter out only the JSON files
    json_files = [item for item in all_items if item.endswith(".json")]

    for file_name in json_files:
        full_json_path = os.path.join(config_path, file_name)
        secure_path = os.path.normpath(full_json_path)

        with open(secure_path, "r", encoding="utf-8") as json_file:
            config_dict = json.load(json_file)
            backend_names.append(config_dict["display_name"])
    return backend_names

get_config(display_name)

The function that downloads the spooler configuration to the storage.

Parameters:

Name Type Description Default
display_name

The name of the backend

required

Raises:

Type Description
FileNotFoundError

If the backend does not exist

Returns:

Type Description
BackendConfigSchemaIn

The configuration of the backend in complete form.

Source code in src/sqooler/storage_providers/local.py
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
def get_config(self, display_name: DisplayNameStr) -> BackendConfigSchemaIn:
    """
    The function that downloads the spooler configuration to the storage.

    Args:
        display_name : The name of the backend

    Raises:
        FileNotFoundError: If the backend does not exist

    Returns:
        The configuration of the backend in complete form.
    """
    # path of the configs
    config_path = self.base_path + "/backends/configs"
    file_name = display_name + ".json"
    full_json_path = os.path.join(config_path, file_name)
    secure_path = os.path.normpath(full_json_path)
    with open(secure_path, "r", encoding="utf-8") as json_file:
        backend_config_dict = json.load(json_file)

    if not backend_config_dict:
        raise FileNotFoundError("The backend does not exist for the given storage.")

    return BackendConfigSchemaIn(**backend_config_dict)

get_file_content(storage_path, job_id)

Get the file content from the storage

Source code in src/sqooler/storage_providers/local.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
@validate_active
def get_file_content(self, storage_path: str, job_id: str) -> dict:
    """
    Get the file content from the storage
    """
    # strip trailing and leading slashes from the storage_path
    storage_path = storage_path.strip("/")

    # create the full path
    file_name = job_id + ".json"
    full_json_path = os.path.join(self.base_path, storage_path, file_name)
    secure_path = os.path.normpath(full_json_path)

    # does the file already exist ?
    if not os.path.exists(secure_path):
        raise FileNotFoundError(
            f"The file {secure_path} does not exist and cannot be loaded."
        )
    with open(secure_path, "r", encoding="utf-8") as json_file:
        loaded_data_dict = json.load(json_file)
    return loaded_data_dict

get_file_queue(storage_path)

Get a list of files

Parameters:

Name Type Description Default
storage_path str

Where are we looking for the files.

required

Returns:

Type Description
list[str]

A list of files that was found.

Source code in src/sqooler/storage_providers/local.py
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
def get_file_queue(self, storage_path: str) -> list[str]:
    """
    Get a list of files

    Args:
        storage_path: Where are we looking for the files.

    Returns:
        A list of files that was found.
    """
    # get a list of files in the folder
    full_path = self.base_path + "/" + storage_path
    # test if the path exists. Otherwise simply return an empty list
    if not os.path.exists(full_path):
        return []
    return os.listdir(full_path)

get_job_content(storage_path, job_id)

Get the content of the job from the storage. This is a wrapper around get_file_content and and handles the different ways of identifiying the job.

storage_path: the path towards the file, excluding the filename / id job_id: the id of the file we are about to look up

Returns:

Type Description
dict

The content of the job

Source code in src/sqooler/storage_providers/local.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def get_job_content(self, storage_path: str, job_id: str) -> dict:
    """
    Get the content of the job from the storage. This is a wrapper around get_file_content
    and and handles the different ways of identifiying the job.

    storage_path: the path towards the file, excluding the filename / id
    job_id: the id of the file we are about to look up

    Returns:
        The content of the job
    """
    job_dict = self.get_file_content(storage_path=storage_path, job_id=job_id)
    return job_dict

get_next_job_in_queue(display_name)

A function that obtains the next job in the queue.

Parameters:

Name Type Description Default
display_name str

The name of the backend

required

Returns:

Type Description
NextJobSchema

the dict of the next job

Source code in src/sqooler/storage_providers/local.py
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
def get_next_job_in_queue(self, display_name: str) -> NextJobSchema:
    """
    A function that obtains the next job in the queue.

    Args:
        display_name: The name of the backend

    Returns:
        the dict of the next job
    """
    queue_dir = "jobs/queued/" + display_name
    job_dict = {"job_id": 0, "job_json_path": "None"}
    job_list = self.get_file_queue(queue_dir)

    # update the time stamp of the last job
    self.timestamp_queue(display_name)

    # if there is a job, we should move it
    if job_list:
        job_json_name = job_list[0]
        job_id = job_json_name[:-5]
        job_dict["job_id"] = job_id

        # and move the file into the right directory
        self.move_file(queue_dir, "jobs/running", job_id)
        job_dict["job_json_path"] = "jobs/running"
    return NextJobSchema(**job_dict)

get_result(display_name, username, job_id)

This function gets the result file from the backend and returns the result dict.

Parameters:

Name Type Description Default
display_name DisplayNameStr

The name of the backend to which we want to upload the job

required
username str

The username of the user that is uploading the job

required
job_id str

The job_id of the job that we want to upload the status for

required

Returns:

Type Description
ResultDict

The result dict of the job. If the information is not available, the result dict

ResultDict

has a status of "ERROR".

Source code in src/sqooler/storage_providers/local.py
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
def get_result(
    self, display_name: DisplayNameStr, username: str, job_id: str
) -> ResultDict:
    """
    This function gets the result file from the backend and returns the result dict.

    Args:
        display_name: The name of the backend to which we want to upload the job
        username: The username of the user that is uploading the job
        job_id: The job_id of the job that we want to upload the status for

    Returns:
        The result dict of the job. If the information is not available, the result dict
        has a status of "ERROR".
    """

    try:
        backend_config_info = self.get_backend_dict(display_name)
    except FileNotFoundError:
        # if the backend does not exist, we return an error
        return ResultDict(
            display_name="",
            backend_version="",
            job_id=job_id,
            qobj_id=None,
            success=False,
            status="ERROR",
            header={},
            results=[],
        )

    result_json_dir = "results/" + display_name
    try:
        result_dict = self.get_file_content(
            storage_path=result_json_dir, job_id=job_id
        )
    except FileNotFoundError:
        # if the job_id is not valid, we return an error
        return ResultDict(
            display_name=display_name,
            backend_version="",
            job_id=job_id,
            qobj_id=None,
            success=False,
            status="ERROR",
            header={},
            results=[],
        )
    result_dict["backend_name"] = backend_config_info.backend_name
    typed_result = ResultDict(**result_dict)
    return typed_result

get_status(display_name, username, job_id)

This function gets the status file from the backend and returns the status dict.

Parameters:

Name Type Description Default
display_name DisplayNameStr

The name of the backend to which we want to upload the job

required
username str

The username of the user that is uploading the job

required
job_id str

The job_id of the job that we want to upload the status for

required

Returns:

Type Description
StatusMsgDict

The status dict of the job

Source code in src/sqooler/storage_providers/local.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
def get_status(
    self, display_name: DisplayNameStr, username: str, job_id: str
) -> StatusMsgDict:
    """
    This function gets the status file from the backend and returns the status dict.

    Args:
        display_name: The name of the backend to which we want to upload the job
        username: The username of the user that is uploading the job
        job_id: The job_id of the job that we want to upload the status for

    Returns:
        The status dict of the job
    """
    status_json_dir = "status/" + display_name

    try:
        status_dict = self.get_file_content(
            storage_path=status_json_dir, job_id=job_id
        )
        return StatusMsgDict(**status_dict)
    except FileNotFoundError:
        # if the job_id is not valid, we return an error
        return StatusMsgDict(
            job_id=job_id,
            status="ERROR",
            detail="Cannot get status",
            error_message=f"Could not find status for {display_name} with job_id {job_id}.",
        )

move_file(start_path, final_path, job_id)

Move the file from start_path to final_path

Source code in src/sqooler/storage_providers/local.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
@validate_active
def move_file(self, start_path: str, final_path: str, job_id: str) -> None:
    """
    Move the file from `start_path` to `final_path`
    """
    start_path = start_path.strip("/")

    source_file = self.base_path + "/" + start_path + "/" + job_id + ".json"

    final_path = self.base_path + "/" + final_path + "/"
    if not os.path.exists(final_path):
        os.makedirs(final_path)

    # Move the file
    shutil.move(source_file, final_path)

update_file(content_dict, storage_path, job_id)

Update the file content.

Parameters:

Name Type Description Default
content_dict dict

The dictionary containing the new content of the file

required
storage_path str

The path to the file

required
job_id str

The id of the job

required

Returns:

Type Description
None

None

Raises:

Type Description
FileNotFoundError

If the file is not found

Source code in src/sqooler/storage_providers/local.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def update_file(self, content_dict: dict, storage_path: str, job_id: str) -> None:
    """
    Update the file content.

    Args:
        content_dict: The dictionary containing the new content of the file
        storage_path: The path to the file
        job_id: The id of the job

    Returns:
        None

    Raises:
        FileNotFoundError: If the file is not found
    """
    # strip trailing and leading slashes from the storage_path
    storage_path = storage_path.strip("/")

    # json folder
    file_name = job_id + ".json"
    full_json_path = os.path.join(self.base_path, storage_path, file_name)
    secure_path = os.path.normpath(full_json_path)

    # does the file already exist ?
    if not os.path.exists(secure_path):
        raise FileNotFoundError(
            f"The file {secure_path} does not exist and cannot be updated."
        )
    with open(secure_path, "w", encoding="utf-8") as json_file:
        json.dump(content_dict, json_file)

update_in_database(result_dict, status_msg_dict, job_id, display_name)

Upload the status and result to the StorageProvider.

Parameters:

Name Type Description Default
result_dict ResultDict

the dictionary containing the result of the job

required
status_msg_dict StatusMsgDict

the dictionary containing the status message of the job

required
job_id str

the name of the job

required
display_name DisplayNameStr

the name of the backend

required

Returns:

Type Description
None

None

Source code in src/sqooler/storage_providers/local.py
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
def update_in_database(
    self,
    result_dict: ResultDict,
    status_msg_dict: StatusMsgDict,
    job_id: str,
    display_name: DisplayNameStr,
) -> None:
    """
    Upload the status and result to the `StorageProvider`.

    Args:
        result_dict: the dictionary containing the result of the job
        status_msg_dict: the dictionary containing the status message of the job
        job_id: the name of the job
        display_name: the name of the backend

    Returns:
        None
    """
    job_json_start_dir = "jobs/running"
    # check if the job is done or had an error
    if status_msg_dict.status == "DONE":
        # test if the result dict is None
        if result_dict is None:
            raise ValueError(
                "The 'result_dict' argument cannot be None if the job is done."
            )
        # let us create the result json file
        result_json_dir = "results/" + display_name
        self.upload(result_dict.model_dump(), result_json_dir, job_id)

        # now move the job out of the running jobs into the finished jobs
        job_finished_json_dir = "jobs/finished/" + display_name
        self.move_file(job_json_start_dir, job_finished_json_dir, job_id)

    elif status_msg_dict.status == "ERROR":
        # because there was an error, we move the job to the deleted jobs
        deleted_json_dir = "jobs/deleted"
        self.move_file(job_json_start_dir, deleted_json_dir, job_id)

    # and create the status json file
    status_json_dir = "status/" + display_name
    self.update_file(status_msg_dict.model_dump(), status_json_dir, job_id)

upload(content_dict, storage_path, job_id)

Upload the file to the storage

Source code in src/sqooler/storage_providers/local.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@validate_active
def upload(self, content_dict: Mapping, storage_path: str, job_id: str) -> None:
    """
    Upload the file to the storage
    """
    # strip trailing and leading slashes from the storage_path
    storage_path = storage_path.strip("/")

    # json folder
    folder_path = self.base_path + "/" + storage_path
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

    # create the full path
    file_name = job_id + ".json"
    full_json_path = os.path.join(folder_path, file_name)
    secure_path = os.path.normpath(full_json_path)

    with open(secure_path, "w", encoding="utf-8") as json_file:
        json.dump(content_dict, json_file)

upload_config(config_dict, display_name)

The function that uploads the spooler configuration to the storage.

Parameters:

Name Type Description Default
config_dict BackendConfigSchemaIn

The dictionary containing the configuration

required
display_name

The name of the backend

required

Returns:

Type Description
None

None

Source code in src/sqooler/storage_providers/local.py
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
def upload_config(
    self, config_dict: BackendConfigSchemaIn, display_name: DisplayNameStr
) -> None:
    """
    The function that uploads the spooler configuration to the storage.

    Args:
        config_dict: The dictionary containing the configuration
        display_name : The name of the backend

    Returns:
        None
    """
    # path of the configs
    config_path = os.path.join(self.base_path, "backends/configs")
    config_path = os.path.normpath(os.path.join(self.base_path, "backends/configs"))
    # test if the config path already exists. If it does not, create it
    if not os.path.exists(config_path):
        os.makedirs(config_path)

    file_name = display_name + ".json"
    full_json_path = os.path.join(config_path, file_name)
    secure_path = os.path.normpath(full_json_path)
    with open(secure_path, "w", encoding="utf-8") as json_file:
        json_file.write(config_dict.model_dump_json())

upload_job(job_dict, display_name, username)

Upload the job to the storage provider.

Parameters:

Name Type Description Default
job_dict dict

the full job dict

required
display_name DisplayNameStr

the name of the backend

required
username str

the name of the user that submitted the job

required

Returns:

Type Description
str

The job id of the uploaded job.

Source code in src/sqooler/storage_providers/local.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def upload_job(
    self, job_dict: dict, display_name: DisplayNameStr, username: str
) -> str:
    """
    Upload the job to the storage provider.

    Args:
        job_dict: the full job dict
        display_name: the name of the backend
        username: the name of the user that submitted the job

    Returns:
        The job id of the uploaded job.
    """

    storage_path = "jobs/queued/" + display_name
    job_id = (uuid.uuid4().hex)[:24]

    self.upload(content_dict=job_dict, storage_path=storage_path, job_id=job_id)
    return job_id

upload_status(display_name, username, job_id)

This function uploads a status file to the backend and creates the status dict.

Parameters:

Name Type Description Default
display_name DisplayNameStr

The name of the backend to which we want to upload the job

required
username str

The username of the user that is uploading the job

required
job_id str

The job_id of the job that we want to upload the status for

required

Returns:

Type Description
StatusMsgDict

The status dict of the job

Source code in src/sqooler/storage_providers/local.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def upload_status(
    self, display_name: DisplayNameStr, username: str, job_id: str
) -> StatusMsgDict:
    """
    This function uploads a status file to the backend and creates the status dict.

    Args:
        display_name: The name of the backend to which we want to upload the job
        username: The username of the user that is uploading the job
        job_id: The job_id of the job that we want to upload the status for

    Returns:
        The status dict of the job
    """
    storage_path = "status/" + display_name
    status_draft = {
        "job_id": job_id,
        "status": "INITIALIZING",
        "detail": "Got your json.",
        "error_message": "None",
    }

    # should we also upload the username into the dict ?
    status_dict = StatusMsgDict(**status_draft)
    # now upload the status dict
    self.upload(
        content_dict=status_dict.model_dump(),
        storage_path=storage_path,
        job_id=job_id,
    )
    return status_dict

Comments