Skip to content

API documentation of spoolers

This module contains the code for the Spooler classes and its helpers. So it mainly meant to be deployed on the back-end side for people that would like to perform calculations and work through the job queue.

The main class is the Spooler class. It is the class that is used for the simulators. The LabscriptSpooler class is a specialized version of the Spooler class that allows us to execute jobs in labscript directly.

BaseSpooler

Bases: ABC

Abstract base class for spoolers. They are the main logic of the back-end.

Attributes:

Name Type Description
ins_schema_dict

A dictionary the contains all the allowed instructions for this spooler.

device_config

A dictionary that some main config params for the experiment.

n_wires

maximum number of wires for the spooler

n_max_shots

the maximum number of shots for the spooler

version

the version of the backend

cold_atom_type

the type of cold atom that is used in the experiment

n_max_experiments

the maximum number of experiments that can be executed

wire_order

the order of the wires

num_species

the number of atomic species in the experiment

sign

sign the results of the job

Source code in src/sqooler/spoolers.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
class BaseSpooler(ABC):
    """
    Abstract base class for spoolers. They are the main logic of the back-end.

    Attributes:
        ins_schema_dict : A dictionary the contains all the allowed instructions for this spooler.
        device_config: A dictionary that some main config params for the experiment.
        n_wires: maximum number of wires for the spooler
        n_max_shots: the maximum number of shots for the spooler
        version: the version of the backend
        cold_atom_type: the type of cold atom that is used in the experiment
        n_max_experiments: the maximum number of experiments that can be executed
        wire_order: the order of the wires
        num_species: the number of atomic species in the experiment
        sign: sign the results of the job
    """

    def __init__(
        self,
        ins_schema_dict: dict,
        device_config: Type[BaseModel],
        n_wires: int,
        description: str = "",
        n_max_shots: int = 1000,
        version: str = "0.0.1",
        cold_atom_type: ColdAtomStr = "spin",
        n_max_experiments: int = 15,
        wire_order: WireOrderStr = "interleaved",
        num_species: int = 1,
        sign: bool = False,
    ):
        """
        The constructor of the class.
        """
        self.ins_schema_dict = ins_schema_dict
        self.device_config = device_config
        self.n_max_shots = n_max_shots
        self.n_wires = n_wires
        self.description = description
        self.version = version
        self.cold_atom_type = cold_atom_type
        self.n_max_experiments = n_max_experiments
        self.wire_order = wire_order
        self.num_species = num_species
        self._display_name: str = ""
        self.sign = sign

    def check_experiment(self, exper_dict: dict) -> tuple[str, bool]:
        """
        Check the validity of the experiment. It checks if the the instructions are valid
        based on the device configuration of the spooler.

        Args:
            exper_dict: The dictionary that contains the logic and should
                be verified.

        Returns:
            str: The error message
            bool: Is the experiment ok ?
        """
        try:
            self.device_config(**exper_dict)
            return "", True
        except ValidationError as err:
            return str(err), False

    def get_configuration(self) -> BackendConfigSchemaIn:
        """
        Sends back the configuration dictionary of the spooler.

        This creates the configuration of the Spooler. However, it does not contain
        any information about the operational status. This is not really connected to
        the machine but much rather to the queue etc. So items of the `BackendConfigSchemaIn`
        like `operational` or `last_queue_check` are not set here at just the default values.
        ``

        Returns:
            The configuration dictionary of the spooler.
        """
        gate_list = []
        for _, ins_obj in self.ins_schema_dict.items():
            if "is_gate" in ins_obj.model_fields:
                gate_list.append(ins_obj.config_dict())
        backend_config_dict = {
            "description": self.description,
            "version": self.version,
            "cold_atom_type": self.cold_atom_type,
            "gates": gate_list,
            "max_experiments": self.n_max_experiments,
            "max_shots": self.n_max_shots,
            "simulator": True,
            "supported_instructions": list(self.ins_schema_dict.keys()),
            "num_wires": self.n_wires,
            "wire_order": self.wire_order,
            "num_species": self.num_species,
            "display_name": self.display_name,
            "sign": self.sign,
        }
        return BackendConfigSchemaIn(**backend_config_dict)

    def check_instructions(self, ins_list: list) -> tuple[str, bool]:
        """
        Check all the instruction to make sure that they are valid and part
        of the allowed instructions.

        Args:
            ins_list: The list of instructions that should be checked.

        Returns:
            str: The error message
            bool: Are the instructions ok ?
        """
        err_code = ""
        exp_ok = False
        # first check that we actually have any instructions safed in the ins_schema_dict
        if len(self.ins_schema_dict) == 0:
            err_code = "No instructions allowed. Add instructions to the spooler."
            exp_ok = False
            return err_code, exp_ok

        for ins in ins_list:
            try:
                gate_instr = gate_dict_from_list(ins)
                # see if the instruction is part of the allowed instructions
                if gate_instr.name not in self.ins_schema_dict.keys():
                    err_code = f"Instruction {gate_instr.name} not allowed."
                    exp_ok = False
                    return err_code, exp_ok

                # now verify that the parameters are ok
                gate_dict = gate_instr.model_dump()
                self.ins_schema_dict[gate_instr.name](**gate_dict)
                exp_ok = True
            except ValidationError as err:
                err_code = "Error in instruction " + str(err)
                exp_ok = False
            if not exp_ok:
                break
        return err_code, exp_ok

    def check_dimension(self, json_dict: dict) -> tuple[str, bool]:
        """
        Make sure that the Hilbert space dimension is not too large.

        It can be implemented in the class that inherits, but it is not necessary.
        So this is only a placeholder.

        Args:
            json_dict: the dictonary with the instructions

        Returns:
            str: the error message
            bool: is the dimension ok ?
        """
        # pylint: disable=W0613
        return "", True

    def check_json_dict(
        self, json_dict: dict[str, dict]
    ) -> tuple[str, bool, dict[str, ExperimentalInputDict]]:
        """
        Check if the json file has the appropiate syntax.

        Args:
            json_dict (dict): the dictonary that we will test.

        Returns:
            str: the error message
            bool: is the expression having the appropiate syntax ?
            dict: the cleaned dictionary with proper typing
        """
        err_code = "No instructions received."
        exp_ok = False
        clean_dict: dict[str, ExperimentalInputDict] = {}
        for expr in json_dict:
            err_code = "Wrong experiment name or too many experiments"
            # test the name of the experiment
            if not expr.startswith("experiment_"):
                err_code = "Experiment name must start with experiment_"
                exp_ok = False
                break
            if not expr[11:].isdigit():
                err_code = "Experiment name must end with a number"
                exp_ok = False
                break
            if int(expr[11:]) > self.n_max_experiments:
                err_code = f"Experiment number too high. Must be less than {self.n_max_experiments}"
                exp_ok = False
                break
            exp_ok = True

            # test the structure of the experiment
            err_code, exp_ok = self.check_experiment(json_dict[expr])
            if not exp_ok:
                break
            # time to check the structure of the instructions
            ins_list = json_dict[expr]["instructions"]
            err_code, exp_ok = self.check_instructions(ins_list)
            if not exp_ok:
                break
            clean_dict[expr] = self.get_exp_input_dict(json_dict[expr])
        return err_code.replace("\n", ".."), exp_ok, clean_dict

    def _prep_job(
        self, json_dict: dict, job_id: str
    ) -> tuple[ResultDict, StatusMsgDict, dict]:
        """
        Prepare the job for execution. It checks the json file and prepares the
        result and status dictionaries.

        Args:
            json_dict: The dictionary with the instructions.
            job_id: The id of the job.

        Returns:
            result_dict: The dictionary with the results of the job.
            status_msg_dict: The status dictionary of the job.
            clean_dict: The cleaned dictionary with the instructions.
        """
        status_msg_dict = get_init_status()
        status_msg_dict.job_id = job_id

        result_dict = ResultDict(
            display_name=self.display_name,
            backend_version=self.version,
            job_id=job_id,
            status="INITIALIZING",
        )

        result_dict.results = []  # this simply helps pylint to understand the code

        # check that the json_dict is indeed well behaved
        err_msg, json_is_fine, clean_dict = self.check_json_dict(json_dict)

        if not json_is_fine:
            status_msg_dict.detail += (
                "; Failed json sanity check. File will be deleted. Error message : "
                + err_msg
            )
            status_msg_dict.error_message += (
                "; Failed json sanity check. File will be deleted. Error message : "
                + err_msg
            )
            status_msg_dict.status = "ERROR"
            logging.error(
                "Error in json compatibility test.",
                extra={"error_message": status_msg_dict.error_message},
            )
            return result_dict, status_msg_dict, clean_dict

        # now we need to check the dimensionality of the experiment
        dim_err_msg, dim_ok = self.check_dimension(json_dict)
        if not dim_ok:
            status_msg_dict.detail += (
                "; Failed dimensionality test. Too many atoms. File will be deleted. Error message : "
                + dim_err_msg
            )
            status_msg_dict.error_message += (
                "; Failed dimensionality test. Too many atoms. File will be deleted. Error message :  "
                + dim_err_msg
            )
            status_msg_dict.status = "ERROR"
            logging.error(
                "Error in dimensionality test.",
                extra={"error_message": status_msg_dict.error_message},
            )
            return result_dict, status_msg_dict, clean_dict

        return result_dict, status_msg_dict, clean_dict

    @property
    def display_name(self) -> str:
        """
        The name of the spooler.
        """
        return self._display_name

    @display_name.setter
    def display_name(self, value: str) -> None:
        if isinstance(value, str):  # Check if the provided value is a string
            self._display_name = value
        else:
            raise ValueError("display_name must be a string")

    def get_exp_input_dict(self, json_dict: dict) -> ExperimentalInputDict:
        """
        Transforms the dictionary into an ExperimentalInputDict object.

        Args:
            json_dict: The dictionary that should be transformed.

        Returns:
            A ExperimentalInputDict object.
        """
        raw_ins_list = json_dict["instructions"]
        ins_list = [gate_dict_from_list(instr) for instr in raw_ins_list]
        exp_info = ExperimentalInputDict(
            instructions=ins_list,
            shots=json_dict["shots"],
            wire_order=json_dict["wire_order"],
            num_wires=json_dict["num_wires"],
        )
        exp_info = ExperimentalInputDict(
            instructions=ins_list,
            shots=json_dict["shots"],
            wire_order=json_dict["wire_order"],
            num_wires=json_dict["num_wires"],
            seed=json_dict.get("seed", None),
        )
        return exp_info

    def get_private_jwk(self) -> JWK:
        """
        Get the private JWK for the spooler.

        Returns:
            The private JWK for the spooler.

        Raises:
            ValueError: If the private JWK is not set.
        """
        private_jwk_str = config("PRIVATE_JWK_STR", default=None)
        if private_jwk_str == "":
            logging.error("PRIVATE_JWK_STR must not be empty.")

            raise ValueError("PRIVATE_JWK_STR must not be empty.")

        if private_jwk_str is None:
            logging.error("PRIVATE_JWK_STR is not set and available.")
            raise ValueError("PRIVATE_JWK_STR is not set and available.")
        try:
            return jwk_from_config_str(private_jwk_str)
        except BinasciiError as bin_err:
            logging.error("PRIVATE_JWK_STR is invalid.")
            raise ValueError("PRIVATE_JWK_STR is invalid.") from bin_err

display_name: str property writable

The name of the spooler.

__init__(ins_schema_dict, device_config, n_wires, description='', n_max_shots=1000, version='0.0.1', cold_atom_type='spin', n_max_experiments=15, wire_order='interleaved', num_species=1, sign=False)

The constructor of the class.

Source code in src/sqooler/spoolers.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def __init__(
    self,
    ins_schema_dict: dict,
    device_config: Type[BaseModel],
    n_wires: int,
    description: str = "",
    n_max_shots: int = 1000,
    version: str = "0.0.1",
    cold_atom_type: ColdAtomStr = "spin",
    n_max_experiments: int = 15,
    wire_order: WireOrderStr = "interleaved",
    num_species: int = 1,
    sign: bool = False,
):
    """
    The constructor of the class.
    """
    self.ins_schema_dict = ins_schema_dict
    self.device_config = device_config
    self.n_max_shots = n_max_shots
    self.n_wires = n_wires
    self.description = description
    self.version = version
    self.cold_atom_type = cold_atom_type
    self.n_max_experiments = n_max_experiments
    self.wire_order = wire_order
    self.num_species = num_species
    self._display_name: str = ""
    self.sign = sign

check_dimension(json_dict)

Make sure that the Hilbert space dimension is not too large.

It can be implemented in the class that inherits, but it is not necessary. So this is only a placeholder.

Parameters:

Name Type Description Default
json_dict dict

the dictonary with the instructions

required

Returns:

Name Type Description
str str

the error message

bool bool

is the dimension ok ?

Source code in src/sqooler/spoolers.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def check_dimension(self, json_dict: dict) -> tuple[str, bool]:
    """
    Make sure that the Hilbert space dimension is not too large.

    It can be implemented in the class that inherits, but it is not necessary.
    So this is only a placeholder.

    Args:
        json_dict: the dictonary with the instructions

    Returns:
        str: the error message
        bool: is the dimension ok ?
    """
    # pylint: disable=W0613
    return "", True

check_experiment(exper_dict)

Check the validity of the experiment. It checks if the the instructions are valid based on the device configuration of the spooler.

Parameters:

Name Type Description Default
exper_dict dict

The dictionary that contains the logic and should be verified.

required

Returns:

Name Type Description
str str

The error message

bool bool

Is the experiment ok ?

Source code in src/sqooler/spoolers.py
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def check_experiment(self, exper_dict: dict) -> tuple[str, bool]:
    """
    Check the validity of the experiment. It checks if the the instructions are valid
    based on the device configuration of the spooler.

    Args:
        exper_dict: The dictionary that contains the logic and should
            be verified.

    Returns:
        str: The error message
        bool: Is the experiment ok ?
    """
    try:
        self.device_config(**exper_dict)
        return "", True
    except ValidationError as err:
        return str(err), False

check_instructions(ins_list)

Check all the instruction to make sure that they are valid and part of the allowed instructions.

Parameters:

Name Type Description Default
ins_list list

The list of instructions that should be checked.

required

Returns:

Name Type Description
str str

The error message

bool bool

Are the instructions ok ?

Source code in src/sqooler/spoolers.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def check_instructions(self, ins_list: list) -> tuple[str, bool]:
    """
    Check all the instruction to make sure that they are valid and part
    of the allowed instructions.

    Args:
        ins_list: The list of instructions that should be checked.

    Returns:
        str: The error message
        bool: Are the instructions ok ?
    """
    err_code = ""
    exp_ok = False
    # first check that we actually have any instructions safed in the ins_schema_dict
    if len(self.ins_schema_dict) == 0:
        err_code = "No instructions allowed. Add instructions to the spooler."
        exp_ok = False
        return err_code, exp_ok

    for ins in ins_list:
        try:
            gate_instr = gate_dict_from_list(ins)
            # see if the instruction is part of the allowed instructions
            if gate_instr.name not in self.ins_schema_dict.keys():
                err_code = f"Instruction {gate_instr.name} not allowed."
                exp_ok = False
                return err_code, exp_ok

            # now verify that the parameters are ok
            gate_dict = gate_instr.model_dump()
            self.ins_schema_dict[gate_instr.name](**gate_dict)
            exp_ok = True
        except ValidationError as err:
            err_code = "Error in instruction " + str(err)
            exp_ok = False
        if not exp_ok:
            break
    return err_code, exp_ok

check_json_dict(json_dict)

Check if the json file has the appropiate syntax.

Parameters:

Name Type Description Default
json_dict dict

the dictonary that we will test.

required

Returns:

Name Type Description
str str

the error message

bool bool

is the expression having the appropiate syntax ?

dict dict[str, ExperimentalInputDict]

the cleaned dictionary with proper typing

Source code in src/sqooler/spoolers.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
def check_json_dict(
    self, json_dict: dict[str, dict]
) -> tuple[str, bool, dict[str, ExperimentalInputDict]]:
    """
    Check if the json file has the appropiate syntax.

    Args:
        json_dict (dict): the dictonary that we will test.

    Returns:
        str: the error message
        bool: is the expression having the appropiate syntax ?
        dict: the cleaned dictionary with proper typing
    """
    err_code = "No instructions received."
    exp_ok = False
    clean_dict: dict[str, ExperimentalInputDict] = {}
    for expr in json_dict:
        err_code = "Wrong experiment name or too many experiments"
        # test the name of the experiment
        if not expr.startswith("experiment_"):
            err_code = "Experiment name must start with experiment_"
            exp_ok = False
            break
        if not expr[11:].isdigit():
            err_code = "Experiment name must end with a number"
            exp_ok = False
            break
        if int(expr[11:]) > self.n_max_experiments:
            err_code = f"Experiment number too high. Must be less than {self.n_max_experiments}"
            exp_ok = False
            break
        exp_ok = True

        # test the structure of the experiment
        err_code, exp_ok = self.check_experiment(json_dict[expr])
        if not exp_ok:
            break
        # time to check the structure of the instructions
        ins_list = json_dict[expr]["instructions"]
        err_code, exp_ok = self.check_instructions(ins_list)
        if not exp_ok:
            break
        clean_dict[expr] = self.get_exp_input_dict(json_dict[expr])
    return err_code.replace("\n", ".."), exp_ok, clean_dict

get_configuration()

Sends back the configuration dictionary of the spooler.

This creates the configuration of the Spooler. However, it does not contain any information about the operational status. This is not really connected to the machine but much rather to the queue etc. So items of the BackendConfigSchemaIn like operational or last_queue_check are not set here at just the default values. ``

Returns:

Type Description
BackendConfigSchemaIn

The configuration dictionary of the spooler.

Source code in src/sqooler/spoolers.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def get_configuration(self) -> BackendConfigSchemaIn:
    """
    Sends back the configuration dictionary of the spooler.

    This creates the configuration of the Spooler. However, it does not contain
    any information about the operational status. This is not really connected to
    the machine but much rather to the queue etc. So items of the `BackendConfigSchemaIn`
    like `operational` or `last_queue_check` are not set here at just the default values.
    ``

    Returns:
        The configuration dictionary of the spooler.
    """
    gate_list = []
    for _, ins_obj in self.ins_schema_dict.items():
        if "is_gate" in ins_obj.model_fields:
            gate_list.append(ins_obj.config_dict())
    backend_config_dict = {
        "description": self.description,
        "version": self.version,
        "cold_atom_type": self.cold_atom_type,
        "gates": gate_list,
        "max_experiments": self.n_max_experiments,
        "max_shots": self.n_max_shots,
        "simulator": True,
        "supported_instructions": list(self.ins_schema_dict.keys()),
        "num_wires": self.n_wires,
        "wire_order": self.wire_order,
        "num_species": self.num_species,
        "display_name": self.display_name,
        "sign": self.sign,
    }
    return BackendConfigSchemaIn(**backend_config_dict)

get_exp_input_dict(json_dict)

Transforms the dictionary into an ExperimentalInputDict object.

Parameters:

Name Type Description Default
json_dict dict

The dictionary that should be transformed.

required

Returns:

Type Description
ExperimentalInputDict

A ExperimentalInputDict object.

Source code in src/sqooler/spoolers.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
def get_exp_input_dict(self, json_dict: dict) -> ExperimentalInputDict:
    """
    Transforms the dictionary into an ExperimentalInputDict object.

    Args:
        json_dict: The dictionary that should be transformed.

    Returns:
        A ExperimentalInputDict object.
    """
    raw_ins_list = json_dict["instructions"]
    ins_list = [gate_dict_from_list(instr) for instr in raw_ins_list]
    exp_info = ExperimentalInputDict(
        instructions=ins_list,
        shots=json_dict["shots"],
        wire_order=json_dict["wire_order"],
        num_wires=json_dict["num_wires"],
    )
    exp_info = ExperimentalInputDict(
        instructions=ins_list,
        shots=json_dict["shots"],
        wire_order=json_dict["wire_order"],
        num_wires=json_dict["num_wires"],
        seed=json_dict.get("seed", None),
    )
    return exp_info

get_private_jwk()

Get the private JWK for the spooler.

Returns:

Type Description
JWK

The private JWK for the spooler.

Raises:

Type Description
ValueError

If the private JWK is not set.

Source code in src/sqooler/spoolers.py
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
def get_private_jwk(self) -> JWK:
    """
    Get the private JWK for the spooler.

    Returns:
        The private JWK for the spooler.

    Raises:
        ValueError: If the private JWK is not set.
    """
    private_jwk_str = config("PRIVATE_JWK_STR", default=None)
    if private_jwk_str == "":
        logging.error("PRIVATE_JWK_STR must not be empty.")

        raise ValueError("PRIVATE_JWK_STR must not be empty.")

    if private_jwk_str is None:
        logging.error("PRIVATE_JWK_STR is not set and available.")
        raise ValueError("PRIVATE_JWK_STR is not set and available.")
    try:
        return jwk_from_config_str(private_jwk_str)
    except BinasciiError as bin_err:
        logging.error("PRIVATE_JWK_STR is invalid.")
        raise ValueError("PRIVATE_JWK_STR is invalid.") from bin_err

LabscriptSpooler

Bases: BaseSpooler

A specialized spooler class that allows us to execute jobs in labscript directly. The main changes are that we need to add the job in a different way and connect it to a runmanager.remoteClient. It adds three new attributes to the BaseSpooler class.

Attributes:

Name Type Description
remote_client

The remote client that is used to connect to the labscript server.

labscript_params

The parameters that are used to generate the folder for the shots.

run

The run object that is used to execute the labscript file.

Source code in src/sqooler/spoolers.py
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
class LabscriptSpooler(BaseSpooler):
    """
    A specialized spooler class that allows us to execute jobs in labscript directly.
    The main changes are that we need to add the job in a different way and connect it to a
     `runmanager.remoteClient`. It adds three new attributes to the `BaseSpooler` class.

    Attributes:
        remote_client: The remote client that is used to connect to the labscript server.
        labscript_params: The parameters that are used to generate the folder for the shots.
        run: The run object that is used to execute the labscript file.
    """

    def __init__(
        self,
        ins_schema_dict: dict,
        device_config: Type[BaseModel],
        n_wires: int,
        remote_client: Any,  # it would be really nice to fix this type
        labscript_params: LabscriptParams,
        run: Any,  # it would be really nice to fix this type
        description: str = "",
        n_max_shots: int = 1000,
        version: str = "0.0.1",
        cold_atom_type: ColdAtomStr = "spin",
        n_max_experiments: int = 15,
        wire_order: WireOrderStr = "interleaved",
        num_species: int = 1,
        sign: bool = False,
    ):
        """
        The constructor of the class. The  arguments are the same as for the Spooler
        class with two additions.


        """
        super().__init__(
            ins_schema_dict,
            device_config,
            n_wires,
            description,
            n_max_shots,
            version,
            cold_atom_type,
            n_max_experiments,
            wire_order,
            num_species,
            sign,
        )
        self.remote_client = remote_client
        self.labscript_params = labscript_params
        self.run = run

    def add_job(
        self, json_dict: dict[str, dict], job_id: str
    ) -> tuple[ResultDict, StatusMsgDict]:
        """
        The function that translates the json with the instructions into some circuit
        and executes it. It performs several checks for the job to see if it is properly
        working. If things are fine the job gets added the list of things that should be
        executed.

        Args:
            json_dict: The job dictonary of all the instructions.
            job_id: the id of the the job we are treating.

        Returns:
            result_dict: The dictionary with the results of the job.
            status_msg_dict: The status dictionary of the job.
        """
        result_dict, status_msg_dict, clean_dict = self._prep_job(json_dict, job_id)

        if status_msg_dict.status == "ERROR":
            return result_dict, status_msg_dict

        for exp_name, exp_info in clean_dict.items():
            # prepare the shots folder
            self.remote_client.reset_shot_output_folder()
            self._modify_shot_output_folder(job_id + "/" + str(exp_name))

            try:
                result_dict.results.append(self.gen_circuit(exp_name, exp_info, job_id))
            except FileNotFoundError as err:
                error_message = str(err)
                status_msg_dict.detail += "; Failed to generate labscript file."
                status_msg_dict.error_message += f"; Failed to generate labscript \
                            file. Error: {error_message}"
                status_msg_dict.status = "ERROR"
                return result_dict, status_msg_dict
        status_msg_dict.detail += "; Passed json sanity check; Compilation done. \
                    Shots sent to solver."
        status_msg_dict.status = "DONE"
        return result_dict, status_msg_dict

    def _modify_shot_output_folder(self, new_dir: str) -> str:
        """
        I am not sure what this function does.

        Args:
            new_dir: The new directory under which we save the shots.

        Returns:
            The path to the new directory.
        """

        # we should simplify this at some point
        defaut_shot_folder = str(self.remote_client.get_shot_output_folder())

        modified_shot_folder = (defaut_shot_folder.rsplit("\\", 1)[0]) + "/" + new_dir
        # suggested better emthod whcih works the same way on all platforms
        # modified_shot_folder = os.path.join(
        #    os.path.dirname(defaut_shot_folder), new_dir
        # )
        self.remote_client.set_shot_output_folder(modified_shot_folder)
        return modified_shot_folder

    def gen_circuit(
        self, exp_name: str, json_dict: ExperimentalInputDict, job_id: str
    ) -> ExperimentDict:
        """
        This is the main script that generates the labscript file.

        Args:
            exp_name: The name of the experiment
            json_dict: The dictionary that contains the instructions for the circuit.
            job_id: The user id of the user that is running the experiment.

        Returns:
            The path to the labscript file.
        """
        # parameters for the function
        exp_script_folder = self.labscript_params.exp_script_folder

        # local files
        header_path = f"{exp_script_folder}/header.py"
        remote_experiments_path = f"{exp_script_folder}/remote_experiments"
        # make sure that the folder exists
        if not os.path.exists(remote_experiments_path):
            raise FileNotFoundError(
                f"The path {remote_experiments_path} does not exist."
            )

        n_shots = json_dict.shots
        ins_list = json_dict.instructions

        globals_dict = {
            "job_id": "guest",
            "shots": 4,
        }
        globals_dict["shots"] = list(range(n_shots))
        globals_dict["job_id"] = job_id

        self.remote_client.set_globals(globals_dict)
        script_name = f"experiment_{globals_dict['job_id']}.py"
        exp_script = os.path.join(remote_experiments_path, script_name)
        code = ""
        # this is the top part of the script it allows us to import the
        # typical functions that we require for each single sequence
        # first have a look if the file exists
        if not os.path.exists(header_path):
            raise FileNotFoundError(f"Header file not found at {header_path}")

        with open(header_path, "r", encoding="UTF-8") as header_file:
            code = header_file.read()

        # add a line break to the code
        code += "\n"

        with open(exp_script, "w", encoding="UTF-8") as script_file:
            script_file.write(code)

        for inst in ins_list:
            # we can directly use the function name as we have already verified
            # that the function exists in the `add_job` function
            code = f"Experiment.{inst.name}({inst.wires}, {inst.params})\n"
            # we should add proper error handling here
            # pylint: disable=bare-except
            try:
                with open(exp_script, "a", encoding="UTF-8") as script_file:
                    script_file.write(code)
            except:
                logging.error("Something wrong. Does file path exists?")

        code = "Experiment.final_action()" + "\n" + "stop(Experiment.t+0.1)"
        # pylint: disable=bare-except
        try:
            with open(exp_script, "a", encoding="UTF-8") as script_file:
                script_file.write(code)
        except:
            logging.error("Something wrong. Does file path exists?")
        self.remote_client.set_labscript_file(
            exp_script
        )  # CAUTION !! This command only selects the file. It does not generate it!

        # be careful. This is not a blocking command
        self.remote_client.engage()

        # now that we have engaged the calculation we need to wait for the
        # calculation to be done

        # we need to get the current shot output folder
        current_shot_folder = self.remote_client.get_shot_output_folder()
        # we need to get the list of files in the folder
        hdf5_files = get_file_queue(current_shot_folder)

        # we need to wait until we have the right number of files
        while len(hdf5_files) < n_shots:
            sleep(self.labscript_params.t_wait)
            hdf5_files = get_file_queue(current_shot_folder)

        shots_array = []
        # once the files are there we can read them
        for file in hdf5_files:
            this_run = self.run(current_shot_folder + "/" + file)
            got_nat = False
            n_tries = 0
            # sometimes the file is not ready yet. We need to wait a bit
            while not got_nat and n_tries < 15:
                # the exception is raised if the file is not ready yet
                # it is broadly defined within labscript so we cannot do anything about
                # it here.
                # pylint: disable=W0718
                try:
                    # append the result to the array
                    shots_array.append(this_run.get_results("/measure", "nat"))
                    got_nat = True
                except Exception as exc:
                    logging.exception(exc)
                    sleep(self.labscript_params.t_wait)
                    n_tries += 1
        exp_sub_dict = create_memory_data(shots_array, exp_name, n_shots, ins_list)
        return exp_sub_dict

__init__(ins_schema_dict, device_config, n_wires, remote_client, labscript_params, run, description='', n_max_shots=1000, version='0.0.1', cold_atom_type='spin', n_max_experiments=15, wire_order='interleaved', num_species=1, sign=False)

The constructor of the class. The arguments are the same as for the Spooler class with two additions.

Source code in src/sqooler/spoolers.py
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
def __init__(
    self,
    ins_schema_dict: dict,
    device_config: Type[BaseModel],
    n_wires: int,
    remote_client: Any,  # it would be really nice to fix this type
    labscript_params: LabscriptParams,
    run: Any,  # it would be really nice to fix this type
    description: str = "",
    n_max_shots: int = 1000,
    version: str = "0.0.1",
    cold_atom_type: ColdAtomStr = "spin",
    n_max_experiments: int = 15,
    wire_order: WireOrderStr = "interleaved",
    num_species: int = 1,
    sign: bool = False,
):
    """
    The constructor of the class. The  arguments are the same as for the Spooler
    class with two additions.


    """
    super().__init__(
        ins_schema_dict,
        device_config,
        n_wires,
        description,
        n_max_shots,
        version,
        cold_atom_type,
        n_max_experiments,
        wire_order,
        num_species,
        sign,
    )
    self.remote_client = remote_client
    self.labscript_params = labscript_params
    self.run = run

add_job(json_dict, job_id)

The function that translates the json with the instructions into some circuit and executes it. It performs several checks for the job to see if it is properly working. If things are fine the job gets added the list of things that should be executed.

Parameters:

Name Type Description Default
json_dict dict[str, dict]

The job dictonary of all the instructions.

required
job_id str

the id of the the job we are treating.

required

Returns:

Name Type Description
result_dict ResultDict

The dictionary with the results of the job.

status_msg_dict StatusMsgDict

The status dictionary of the job.

Source code in src/sqooler/spoolers.py
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
def add_job(
    self, json_dict: dict[str, dict], job_id: str
) -> tuple[ResultDict, StatusMsgDict]:
    """
    The function that translates the json with the instructions into some circuit
    and executes it. It performs several checks for the job to see if it is properly
    working. If things are fine the job gets added the list of things that should be
    executed.

    Args:
        json_dict: The job dictonary of all the instructions.
        job_id: the id of the the job we are treating.

    Returns:
        result_dict: The dictionary with the results of the job.
        status_msg_dict: The status dictionary of the job.
    """
    result_dict, status_msg_dict, clean_dict = self._prep_job(json_dict, job_id)

    if status_msg_dict.status == "ERROR":
        return result_dict, status_msg_dict

    for exp_name, exp_info in clean_dict.items():
        # prepare the shots folder
        self.remote_client.reset_shot_output_folder()
        self._modify_shot_output_folder(job_id + "/" + str(exp_name))

        try:
            result_dict.results.append(self.gen_circuit(exp_name, exp_info, job_id))
        except FileNotFoundError as err:
            error_message = str(err)
            status_msg_dict.detail += "; Failed to generate labscript file."
            status_msg_dict.error_message += f"; Failed to generate labscript \
                        file. Error: {error_message}"
            status_msg_dict.status = "ERROR"
            return result_dict, status_msg_dict
    status_msg_dict.detail += "; Passed json sanity check; Compilation done. \
                Shots sent to solver."
    status_msg_dict.status = "DONE"
    return result_dict, status_msg_dict

gen_circuit(exp_name, json_dict, job_id)

This is the main script that generates the labscript file.

Parameters:

Name Type Description Default
exp_name str

The name of the experiment

required
json_dict ExperimentalInputDict

The dictionary that contains the instructions for the circuit.

required
job_id str

The user id of the user that is running the experiment.

required

Returns:

Type Description
ExperimentDict

The path to the labscript file.

Source code in src/sqooler/spoolers.py
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
def gen_circuit(
    self, exp_name: str, json_dict: ExperimentalInputDict, job_id: str
) -> ExperimentDict:
    """
    This is the main script that generates the labscript file.

    Args:
        exp_name: The name of the experiment
        json_dict: The dictionary that contains the instructions for the circuit.
        job_id: The user id of the user that is running the experiment.

    Returns:
        The path to the labscript file.
    """
    # parameters for the function
    exp_script_folder = self.labscript_params.exp_script_folder

    # local files
    header_path = f"{exp_script_folder}/header.py"
    remote_experiments_path = f"{exp_script_folder}/remote_experiments"
    # make sure that the folder exists
    if not os.path.exists(remote_experiments_path):
        raise FileNotFoundError(
            f"The path {remote_experiments_path} does not exist."
        )

    n_shots = json_dict.shots
    ins_list = json_dict.instructions

    globals_dict = {
        "job_id": "guest",
        "shots": 4,
    }
    globals_dict["shots"] = list(range(n_shots))
    globals_dict["job_id"] = job_id

    self.remote_client.set_globals(globals_dict)
    script_name = f"experiment_{globals_dict['job_id']}.py"
    exp_script = os.path.join(remote_experiments_path, script_name)
    code = ""
    # this is the top part of the script it allows us to import the
    # typical functions that we require for each single sequence
    # first have a look if the file exists
    if not os.path.exists(header_path):
        raise FileNotFoundError(f"Header file not found at {header_path}")

    with open(header_path, "r", encoding="UTF-8") as header_file:
        code = header_file.read()

    # add a line break to the code
    code += "\n"

    with open(exp_script, "w", encoding="UTF-8") as script_file:
        script_file.write(code)

    for inst in ins_list:
        # we can directly use the function name as we have already verified
        # that the function exists in the `add_job` function
        code = f"Experiment.{inst.name}({inst.wires}, {inst.params})\n"
        # we should add proper error handling here
        # pylint: disable=bare-except
        try:
            with open(exp_script, "a", encoding="UTF-8") as script_file:
                script_file.write(code)
        except:
            logging.error("Something wrong. Does file path exists?")

    code = "Experiment.final_action()" + "\n" + "stop(Experiment.t+0.1)"
    # pylint: disable=bare-except
    try:
        with open(exp_script, "a", encoding="UTF-8") as script_file:
            script_file.write(code)
    except:
        logging.error("Something wrong. Does file path exists?")
    self.remote_client.set_labscript_file(
        exp_script
    )  # CAUTION !! This command only selects the file. It does not generate it!

    # be careful. This is not a blocking command
    self.remote_client.engage()

    # now that we have engaged the calculation we need to wait for the
    # calculation to be done

    # we need to get the current shot output folder
    current_shot_folder = self.remote_client.get_shot_output_folder()
    # we need to get the list of files in the folder
    hdf5_files = get_file_queue(current_shot_folder)

    # we need to wait until we have the right number of files
    while len(hdf5_files) < n_shots:
        sleep(self.labscript_params.t_wait)
        hdf5_files = get_file_queue(current_shot_folder)

    shots_array = []
    # once the files are there we can read them
    for file in hdf5_files:
        this_run = self.run(current_shot_folder + "/" + file)
        got_nat = False
        n_tries = 0
        # sometimes the file is not ready yet. We need to wait a bit
        while not got_nat and n_tries < 15:
            # the exception is raised if the file is not ready yet
            # it is broadly defined within labscript so we cannot do anything about
            # it here.
            # pylint: disable=W0718
            try:
                # append the result to the array
                shots_array.append(this_run.get_results("/measure", "nat"))
                got_nat = True
            except Exception as exc:
                logging.exception(exc)
                sleep(self.labscript_params.t_wait)
                n_tries += 1
    exp_sub_dict = create_memory_data(shots_array, exp_name, n_shots, ins_list)
    return exp_sub_dict

Spooler

Bases: BaseSpooler

The class for the spooler as it can be used for simulators.

Source code in src/sqooler/spoolers.py
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
class Spooler(BaseSpooler):
    """
    The class for the spooler as it can be used for simulators.
    """

    @property
    def gen_circuit(self) -> Callable[[str, ExperimentalInputDict], ExperimentDict]:
        """
        The function that generates the circuit.
        It can be basically anything that allows the execution of the circuit.

        Returns:
            The function that generates the circuit.

        Raises:
            ValueError: if the gen_circuit is not a callable function
        """
        if not hasattr(self, "_gen_circuit"):
            raise ValueError("gen_circuit must be set")
        return self._gen_circuit

    @gen_circuit.setter
    def gen_circuit(
        self, value: Callable[[str, ExperimentalInputDict], ExperimentDict]
    ) -> None:
        """
        The setter for the gen_circuit function. The first argument is the name of the
        experiment and the second argument is the dictionary with the instructions.

        Args:
            value: The function that generates the circuit.
        """
        if callable(value):  # Check if the provided value is a callable (function)
            self._gen_circuit = value
        else:
            raise ValueError("gen_circuit must be a callable function")

    def add_job(
        self, json_dict: dict[str, dict], job_id: str
    ) -> tuple[ResultDict, StatusMsgDict]:
        """
        The function that translates the json with the instructions into some circuit and executes it.
        It performs several checks for the job to see if it is properly working.
        If things are fine the job gets added the list of things that should be executed.

        Args:
            json_dict: The job dictonary of all the instructions.
            job_id: the id of the the job we are treating.

        Returns:
            result_dict: The dictionary with the results of the job.
            status_msg_dict: The status dictionary of the job.
        """
        result_dict, status_msg_dict, clean_dict = self._prep_job(json_dict, job_id)
        if status_msg_dict.status == "ERROR":
            return result_dict, status_msg_dict
        # now we can generate the circuit for each experiment
        for exp_name, exp_info in clean_dict.items():
            try:
                # not sure why pylint is complaining here so we disable it
                result_dict.results.append(
                    self.gen_circuit(exp_name, exp_info)  # pylint: disable=E1102
                )
                logging.info("Experiment %s done.", exp_name)
            except ValueError as err:
                status_msg_dict.detail += "; " + str(err)
                status_msg_dict.error_message += "; " + str(err)
                status_msg_dict.status = "ERROR"
                logging.exception(
                    "Error in gen_circuit.",
                    extra={"error_message": status_msg_dict.error_message},
                )
                return result_dict, status_msg_dict
        status_msg_dict.detail += "; Passed json sanity check; Compilation done. \
                    Shots sent to solver."
        status_msg_dict.status = "DONE"
        return result_dict, status_msg_dict

gen_circuit: Callable[[str, ExperimentalInputDict], ExperimentDict] property writable

The function that generates the circuit. It can be basically anything that allows the execution of the circuit.

Returns:

Type Description
Callable[[str, ExperimentalInputDict], ExperimentDict]

The function that generates the circuit.

Raises:

Type Description
ValueError

if the gen_circuit is not a callable function

add_job(json_dict, job_id)

The function that translates the json with the instructions into some circuit and executes it. It performs several checks for the job to see if it is properly working. If things are fine the job gets added the list of things that should be executed.

Parameters:

Name Type Description Default
json_dict dict[str, dict]

The job dictonary of all the instructions.

required
job_id str

the id of the the job we are treating.

required

Returns:

Name Type Description
result_dict ResultDict

The dictionary with the results of the job.

status_msg_dict StatusMsgDict

The status dictionary of the job.

Source code in src/sqooler/spoolers.py
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
def add_job(
    self, json_dict: dict[str, dict], job_id: str
) -> tuple[ResultDict, StatusMsgDict]:
    """
    The function that translates the json with the instructions into some circuit and executes it.
    It performs several checks for the job to see if it is properly working.
    If things are fine the job gets added the list of things that should be executed.

    Args:
        json_dict: The job dictonary of all the instructions.
        job_id: the id of the the job we are treating.

    Returns:
        result_dict: The dictionary with the results of the job.
        status_msg_dict: The status dictionary of the job.
    """
    result_dict, status_msg_dict, clean_dict = self._prep_job(json_dict, job_id)
    if status_msg_dict.status == "ERROR":
        return result_dict, status_msg_dict
    # now we can generate the circuit for each experiment
    for exp_name, exp_info in clean_dict.items():
        try:
            # not sure why pylint is complaining here so we disable it
            result_dict.results.append(
                self.gen_circuit(exp_name, exp_info)  # pylint: disable=E1102
            )
            logging.info("Experiment %s done.", exp_name)
        except ValueError as err:
            status_msg_dict.detail += "; " + str(err)
            status_msg_dict.error_message += "; " + str(err)
            status_msg_dict.status = "ERROR"
            logging.exception(
                "Error in gen_circuit.",
                extra={"error_message": status_msg_dict.error_message},
            )
            return result_dict, status_msg_dict
    status_msg_dict.detail += "; Passed json sanity check; Compilation done. \
                Shots sent to solver."
    status_msg_dict.status = "DONE"
    return result_dict, status_msg_dict

create_memory_data(shots_array, exp_name, n_shots, instructions=None)

The function to create memory key in results dictionary with proprer formatting.

Parameters:

Name Type Description Default
shots_array list

The array with the shots.

required
exp_name str

The name of the experiment.

required
n_shots int

The number of shots.

required
instructions Optional[list[GateDict]]

The list of instructions that were executed

None

Returns:

Type Description
ExperimentDict

The ExperimentDict object describing the results.

Source code in src/sqooler/spoolers.py
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
def create_memory_data(
    shots_array: list,
    exp_name: str,
    n_shots: int,
    instructions: Optional[list[GateDict]] = None,
) -> ExperimentDict:
    """
    The function to create memory key in results dictionary
    with proprer formatting.

    Args:
        shots_array: The array with the shots.
        exp_name: The name of the experiment.
        n_shots: The number of shots.
        instructions: The list of instructions that were executed

    Returns:
        The ExperimentDict object describing the results.
    """
    exp_sub_dict: dict = {
        "header": {"name": "experiment_0", "extra metadata": "text"},
        "shots": 3,
        "success": True,
        "data": {"memory": None},
    }

    exp_sub_dict["header"]["name"] = exp_name
    exp_sub_dict["shots"] = n_shots
    memory_list = [
        str(shot).replace("[", "").replace("]", "").replace(",", "")
        for shot in shots_array
    ]
    exp_sub_dict["data"]["memory"] = memory_list
    if instructions is not None:
        exp_sub_dict["data"]["instructions"] = instructions
    return ExperimentDict(**exp_sub_dict)

gate_dict_from_list(inst_list)

Transforms a list into an appropiate dictionnary for instructions. The list is assumed to be in the format [name, wires, params].

Parameters:

Name Type Description Default
inst_list list

The list that should be transformed.

required

Returns:

Type Description
GateDict

A GateDict object.

Source code in src/sqooler/spoolers.py
684
685
686
687
688
689
690
691
692
693
694
695
696
def gate_dict_from_list(inst_list: list) -> GateDict:
    """
    Transforms a list into an appropiate dictionnary for instructions. The list
    is assumed to be in the format [name, wires, params].

    Args:
        inst_list: The list that should be transformed.

    Returns:
        A GateDict object.
    """
    gate_draft = {"name": inst_list[0], "wires": inst_list[1], "params": inst_list[2]}
    return GateDict(**gate_draft)

get_file_queue(dir_path)

A function that returns the list of files in the directory.

Parameters:

Name Type Description Default
dir_path str

The path to the directory.

required

Returns:

Type Description
list[str]

A list of files in the directory. It excludes directories.

Raises:

Type Description
ValueError

If the path is not a directory.

Source code in src/sqooler/spoolers.py
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
def get_file_queue(dir_path: str) -> list[str]:
    """
    A function that returns the list of files in the directory.

    Args:
        dir_path: The path to the directory.

    Returns:
        A list of files in the directory. It excludes directories.

    Raises:
        ValueError: If the path is not a directory.
    """

    # make sure that the path is an existing directory
    if not os.path.isdir(dir_path):
        raise ValueError(f"The path {dir_path} is not a directory.")
    files = [
        file
        for file in os.listdir(dir_path)
        if os.path.isfile(os.path.join(dir_path, file))
    ]
    return files

Comments