Skip to content

Task

KernelBatchTask dataclass

KernelBatchTask(
    *,
    context_name: str,
    program_language: str,
    language_version: str = "0.1.0",
    future_cls: type[FutureType] = Future,
    kernels: list[Method],
    arguments: list[dict] | None = None,
    num_shots: int | list[int],
    metadata: list[dict] | None = None
)

Bases: TaskABC[FutureType]


              flowchart TD
              bloqade.core.device.task.KernelBatchTask[KernelBatchTask]
              bloqade.core.device.task.TaskABC[TaskABC]
              bloqade.core.device.mixins.AuthMixin[AuthMixin]

                              bloqade.core.device.task.TaskABC --> bloqade.core.device.task.KernelBatchTask
                                bloqade.core.device.mixins.AuthMixin --> bloqade.core.device.task.TaskABC
                



              click bloqade.core.device.task.KernelBatchTask href "" "bloqade.core.device.task.KernelBatchTask"
              click bloqade.core.device.task.TaskABC href "" "bloqade.core.device.task.TaskABC"
              click bloqade.core.device.mixins.AuthMixin href "" "bloqade.core.device.mixins.AuthMixin"
            

Task that runs multiple kernels, one subtask per kernel.

Attributes:

Name Type Description
kernels list[Method]

Kernels to execute.

arguments list[dict] | None

Per-kernel argument dictionaries. Defaults to None.

num_shots int | list[int]

Shot count per kernel, or one value broadcast to every kernel.

metadata list[dict] | None

Per-kernel metadata. Defaults to None.

num_subtasks property

num_subtasks: int

Number of subtasks in this task's definition.

get_arguments

get_arguments() -> list[dict] | None

Return per-subtask argument dictionaries.

Returns:

Type Description
list[dict] | None

list[dict] | None: One argument dictionary per subtask, or None when no arguments are set.

Source code in src/bloqade/core/device/task.py
471
472
def get_arguments(self) -> list[dict] | None:
    return self.arguments

get_kernels

get_kernels() -> list[ir.Method]

Return the kernels used to build the task's programs.

Returns:

Type Description
list[Method]

list[ir.Method]: Kernels in program-index order.

Source code in src/bloqade/core/device/task.py
468
469
def get_kernels(self) -> list[ir.Method]:
    return self.kernels

get_metadata

get_metadata() -> list[dict] | None

Return per-subtask metadata dictionaries.

Returns:

Type Description
list[dict] | None

list[dict] | None: One metadata dictionary per subtask, or None when no metadata is set.

Source code in src/bloqade/core/device/task.py
479
480
def get_metadata(self) -> list[dict] | None:
    return self.metadata

get_num_shots

get_num_shots() -> list[int]

Return the per-subtask shot counts.

Returns:

Type Description
list[int]

list[int]: Shot count for each subtask, in subtask order.

Source code in src/bloqade/core/device/task.py
474
475
476
477
def get_num_shots(self) -> list[int]:
    if isinstance(self.num_shots, int):
        return [self.num_shots] * self.num_subtasks
    return self.num_shots

summary

summary() -> str

Return a human-readable summary printed on dry-run.

Returns:

Name Type Description
str str

Summary describing what would be submitted.

Source code in src/bloqade/core/device/task.py
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
def summary(self) -> str:
    msg = "=" * 60 + "\n"
    msg += "DRY RUN -- NO PROGRAM WAS ACTUALLY SUBMITTED FOR EXECUTION\n"
    msg += f"Would now submit a task containing {len(self.kernels)} programs:\n"
    for i, kernel in enumerate(self.kernels):
        kernel_print = f"{kernel.sym_name}("
        if self.arguments is not None:
            for arg in self.arguments[i]:
                kernel_print += f"{arg}, "
        kernel_print += ")"
        shots = (
            self.num_shots if isinstance(self.num_shots, int) else self.num_shots[i]
        )
        msg += f"  * {kernel_print} - {shots} shots\n"
    msg += "Set dry_run=False to actually execute the programs.\n"
    msg += "=" * 60 + "\n"
    return msg

ParameterScanTask dataclass

ParameterScanTask(
    *,
    context_name: str,
    program_language: str,
    language_version: str = "0.1.0",
    future_cls: type[FutureType] = Future,
    kernel: Method,
    arguments: list[dict],
    num_shots: int | list[int],
    metadata: list[dict] | None = None
)

Bases: TaskABC[FutureType]


              flowchart TD
              bloqade.core.device.task.ParameterScanTask[ParameterScanTask]
              bloqade.core.device.task.TaskABC[TaskABC]
              bloqade.core.device.mixins.AuthMixin[AuthMixin]

                              bloqade.core.device.task.TaskABC --> bloqade.core.device.task.ParameterScanTask
                                bloqade.core.device.mixins.AuthMixin --> bloqade.core.device.task.TaskABC
                



              click bloqade.core.device.task.ParameterScanTask href "" "bloqade.core.device.task.ParameterScanTask"
              click bloqade.core.device.task.TaskABC href "" "bloqade.core.device.task.TaskABC"
              click bloqade.core.device.mixins.AuthMixin href "" "bloqade.core.device.mixins.AuthMixin"
            

Task that runs one kernel against multiple argument sets.

Each entry in arguments becomes a subtask. The same program is reused for every subtask.

Attributes:

Name Type Description
kernel Method

Kernel executed for each parameter set.

arguments list[dict]

Argument dictionaries, one per subtask.

num_shots int | list[int]

Shot count per subtask, or one value broadcast to every subtask.

metadata list[dict] | None

Per-subtask metadata. Defaults to None.

num_subtasks property

num_subtasks: int

Number of subtasks in this task's definition.

get_arguments

get_arguments() -> list[dict]

Return per-subtask argument dictionaries.

Returns:

Type Description
list[dict] | None

list[dict] | None: One argument dictionary per subtask, or None when no arguments are set.

Source code in src/bloqade/core/device/task.py
367
368
def get_arguments(self) -> list[dict]:
    return self.arguments

get_kernels

get_kernels() -> list[ir.Method]

Return the kernels used to build the task's programs.

Returns:

Type Description
list[Method]

list[ir.Method]: Kernels in program-index order.

Source code in src/bloqade/core/device/task.py
364
365
def get_kernels(self) -> list[ir.Method]:
    return [self.kernel]

get_metadata

get_metadata() -> list[dict] | None

Return per-subtask metadata dictionaries.

Returns:

Type Description
list[dict] | None

list[dict] | None: One metadata dictionary per subtask, or None when no metadata is set.

Source code in src/bloqade/core/device/task.py
375
376
def get_metadata(self) -> list[dict] | None:
    return self.metadata

get_num_shots

get_num_shots() -> list[int]

Return the per-subtask shot counts.

Returns:

Type Description
list[int]

list[int]: Shot count for each subtask, in subtask order.

Source code in src/bloqade/core/device/task.py
370
371
372
373
def get_num_shots(self) -> list[int]:
    if isinstance(self.num_shots, int):
        return [self.num_shots] * self.num_subtasks
    return self.num_shots

program_index_for_subtask

program_index_for_subtask(i: int) -> int

Return the program index used by subtask i.

The default implementation maps each subtask to its own program. Parameter-scan tasks override this to reuse a single program.

Parameters:

Name Type Description Default
i int

Subtask index.

required

Returns:

Name Type Description
int int

Program index.

Source code in src/bloqade/core/device/task.py
378
379
def program_index_for_subtask(self, i: int) -> int:
    return 0

summary

summary() -> str

Return a human-readable summary printed on dry-run.

Returns:

Name Type Description
str str

Summary describing what would be submitted.

Source code in src/bloqade/core/device/task.py
381
382
383
384
385
386
387
388
def summary(self) -> str:
    msg = "=" * 60 + "\n"
    msg += "DRY RUN -- NO PROGRAM WAS ACTUALLY SUBMITTED FOR EXECUTION\n"
    msg += f"Would now submit a task containing {self.num_subtasks} subtasks.\n"
    msg += f"These subtasks correspond to parameter sets of the kernel {self.kernel.sym_name}.\n"
    msg += "Set dry_run=False to actually execute the parameter scan.\n"
    msg += "=" * 60
    return msg

SingleKernelTask dataclass

SingleKernelTask(
    *,
    context_name: str,
    program_language: str,
    language_version: str = "0.1.0",
    future_cls: type[FutureType] = Future,
    kernel: Method,
    arguments: dict | None = None,
    num_shots: int,
    metadata: dict | None = None
)

Bases: TaskABC[FutureType]


              flowchart TD
              bloqade.core.device.task.SingleKernelTask[SingleKernelTask]
              bloqade.core.device.task.TaskABC[TaskABC]
              bloqade.core.device.mixins.AuthMixin[AuthMixin]

                              bloqade.core.device.task.TaskABC --> bloqade.core.device.task.SingleKernelTask
                                bloqade.core.device.mixins.AuthMixin --> bloqade.core.device.task.TaskABC
                



              click bloqade.core.device.task.SingleKernelTask href "" "bloqade.core.device.task.SingleKernelTask"
              click bloqade.core.device.task.TaskABC href "" "bloqade.core.device.task.TaskABC"
              click bloqade.core.device.mixins.AuthMixin href "" "bloqade.core.device.mixins.AuthMixin"
            

Task that runs a single kernel with one set of arguments.

Attributes:

Name Type Description
kernel Method

Kernel to execute.

arguments dict | None

Arguments for the kernel. Defaults to None.

num_shots int

Shot count for the kernel.

metadata dict | None

Metadata for the single subtask. Defaults to None.

num_subtasks property

num_subtasks: int

Number of subtasks in this task's definition.

get_arguments

get_arguments() -> list[dict] | None

Return per-subtask argument dictionaries.

Returns:

Type Description
list[dict] | None

list[dict] | None: One argument dictionary per subtask, or None when no arguments are set.

Source code in src/bloqade/core/device/task.py
416
417
418
def get_arguments(self) -> list[dict] | None:
    if self.arguments is not None:
        return [self.arguments]

get_kernels

get_kernels() -> list[ir.Method]

Return the kernels used to build the task's programs.

Returns:

Type Description
list[Method]

list[ir.Method]: Kernels in program-index order.

Source code in src/bloqade/core/device/task.py
413
414
def get_kernels(self) -> list[ir.Method]:
    return [self.kernel]

get_metadata

get_metadata() -> list[dict] | None

Return per-subtask metadata dictionaries.

Returns:

Type Description
list[dict] | None

list[dict] | None: One metadata dictionary per subtask, or None when no metadata is set.

Source code in src/bloqade/core/device/task.py
423
424
425
def get_metadata(self) -> list[dict] | None:
    if self.metadata is not None:
        return [self.metadata]

get_num_shots

get_num_shots() -> list[int]

Return the per-subtask shot counts.

Returns:

Type Description
list[int]

list[int]: Shot count for each subtask, in subtask order.

Source code in src/bloqade/core/device/task.py
420
421
def get_num_shots(self) -> list[int]:
    return [self.num_shots]

summary

summary() -> str

Return a human-readable summary printed on dry-run.

Returns:

Name Type Description
str str

Summary describing what would be submitted.

Source code in src/bloqade/core/device/task.py
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
def summary(self) -> str:
    msg = "=" * 60 + "\n"
    msg += "DRY RUN -- NO PROGRAM WAS ACTUALLY SUBMITTED FOR EXECUTION\n"
    msg += "Would now submit a task containing a single subtask for the kernel:\n"
    if self.arguments is not None:
        formatted_arguments = ", ".join(
            f"{key}={value}" for key, value in self.arguments.items()
        )
    else:
        formatted_arguments = ""
    kernel_print = f"{self.kernel.sym_name}({formatted_arguments})"
    shots = self.num_shots if isinstance(self.num_shots, int) else self.num_shots[0]
    msg += f"  * {kernel_print} - {shots} shots\n"
    msg += "Set dry_run=False to actually execute this kernel.\n"
    msg += "=" * 60
    return msg

TaskABC dataclass

TaskABC(
    *,
    context_name: str,
    program_language: str,
    language_version: str = "0.1.0",
    future_cls: type[FutureType] = Future
)

Bases: Generic[FutureType], AuthMixin, ABC


              flowchart TD
              bloqade.core.device.task.TaskABC[TaskABC]
              bloqade.core.device.mixins.AuthMixin[AuthMixin]

                              bloqade.core.device.mixins.AuthMixin --> bloqade.core.device.task.TaskABC
                


              click bloqade.core.device.task.TaskABC href "" "bloqade.core.device.task.TaskABC"
              click bloqade.core.device.mixins.AuthMixin href "" "bloqade.core.device.mixins.AuthMixin"
            

Abstract base class for kernel tasks.

A task collects one or more kernels and per-subtask metadata into a TaskDefinition that can be dry-run or submitted to the backend.

Attributes:

Name Type Description
program_language str

Program language identifier stored on the task definition and used when serializing kernels.

language_version str

Program language version stored on the task definition and used when serializing kernels. Must be a semantic version. Set this directly for a static version, or override the program_language_version property if the version needs additional logic. Defaults to "0.1.0".

future_cls type[FutureType]

Future class used to construct the return value of submit_task_definition. Defaults to Future.

num_subtasks abstractmethod property

num_subtasks: int

Number of subtasks in this task's definition.

program_language_version property

program_language_version: str

Program language version recorded when serializing kernels.

Defaults to the language_version attribute. Override this property in a subclass if the version needs to be computed with additional logic. The value must be a semantic version.

Returns:

Name Type Description
str str

Semantic version string.

create_task_definition

create_task_definition() -> TaskDefinition

Build a TaskDefinition from this task's kernels and subtasks.

Override this method directly if your use-case doesn't fit the API contract.

Returns:

Name Type Description
TaskDefinition TaskDefinition

Definition ready to be submitted.

Source code in src/bloqade/core/device/task.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def create_task_definition(self) -> TaskDefinition:
    """Build a `TaskDefinition` from this task's kernels and subtasks.

    Override this method directly if your use-case doesn't fit the API
    contract.

    Returns:
        TaskDefinition: Definition ready to be submitted.
    """
    programs = self.programs()

    num_shots = self.get_num_shots()

    subtasks = []
    arguments = self.get_arguments()
    metadata = self.get_metadata()
    for i in range(self.num_subtasks):

        if arguments is not None:
            args = arguments[i]
        else:
            args = None

        if metadata is not None:
            subtask_metadata = TaskMetadata(user_metadata=json.dumps(metadata[i]))
        else:
            subtask_metadata = None

        subtasks.append(
            Subtask(
                program_index=self.program_index_for_subtask(i),
                num_shots=num_shots[i],
                arguments=args,
                subtask_metadata=subtask_metadata,
            )
        )

    program_language_with_version = f"{self.program_language}.v{self.program_language_version.removeprefix('v')}"
    return TaskDefinition(
        program_language=program_language_with_version,
        programs=programs,
        subtasks=subtasks,
    )

get_arguments abstractmethod

get_arguments() -> list[dict] | None

Return per-subtask argument dictionaries.

Returns:

Type Description
list[dict] | None

list[dict] | None: One argument dictionary per subtask, or None when no arguments are set.

Source code in src/bloqade/core/device/task.py
126
127
128
129
130
131
132
133
134
@abstractmethod
def get_arguments(self) -> list[dict] | None:
    """Return per-subtask argument dictionaries.

    Returns:
        list[dict] | None: One argument dictionary per subtask, or None
            when no arguments are set.
    """
    ...

get_kernels abstractmethod

get_kernels() -> list[ir.Method]

Return the kernels used to build the task's programs.

Returns:

Type Description
list[Method]

list[ir.Method]: Kernels in program-index order.

Source code in src/bloqade/core/device/task.py
117
118
119
120
121
122
123
124
@abstractmethod
def get_kernels(self) -> list[ir.Method]:
    """Return the kernels used to build the task's programs.

    Returns:
        list[ir.Method]: Kernels in program-index order.
    """
    ...

get_metadata abstractmethod

get_metadata() -> list[dict] | None

Return per-subtask metadata dictionaries.

Returns:

Type Description
list[dict] | None

list[dict] | None: One metadata dictionary per subtask, or None when no metadata is set.

Source code in src/bloqade/core/device/task.py
136
137
138
139
140
141
142
143
144
@abstractmethod
def get_metadata(self) -> list[dict] | None:
    """Return per-subtask metadata dictionaries.

    Returns:
        list[dict] | None: One metadata dictionary per subtask, or None
            when no metadata is set.
    """
    ...

get_num_shots abstractmethod

get_num_shots() -> list[int]

Return the per-subtask shot counts.

Returns:

Type Description
list[int]

list[int]: Shot count for each subtask, in subtask order.

Source code in src/bloqade/core/device/task.py
146
147
148
149
150
151
152
153
@abstractmethod
def get_num_shots(self) -> list[int]:
    """Return the per-subtask shot counts.

    Returns:
        list[int]: Shot count for each subtask, in subtask order.
    """
    ...

program_index_for_subtask

program_index_for_subtask(i: int) -> int

Return the program index used by subtask i.

The default implementation maps each subtask to its own program. Parameter-scan tasks override this to reuse a single program.

Parameters:

Name Type Description Default
i int

Subtask index.

required

Returns:

Name Type Description
int int

Program index.

Source code in src/bloqade/core/device/task.py
168
169
170
171
172
173
174
175
176
177
178
179
180
def program_index_for_subtask(self, i: int) -> int:
    """Return the program index used by subtask `i`.

    The default implementation maps each subtask to its own program.
    Parameter-scan tasks override this to reuse a single program.

    Args:
        i (int): Subtask index.

    Returns:
        int: Program index.
    """
    return i

programs

programs() -> list[Program]

Build the program list for the task definition.

Returns:

Type Description
list[Program]

list[Program]: One Program per kernel returned by get_kernels, serialized via serialize_kernel.

Source code in src/bloqade/core/device/task.py
155
156
157
158
159
160
161
162
163
164
165
166
def programs(self) -> list[Program]:
    """Build the program list for the task definition.

    Returns:
        list[Program]: One `Program` per kernel returned by
            `get_kernels`, serialized via `serialize_kernel`.
    """
    kernels = self.get_kernels()
    programs = []
    for kernel in kernels:
        programs.append(Program(content=self.serialize_kernel(kernel)))
    return programs

run_async

run_async(
    *,
    dry_run: Literal[True],
    storage: StorageBackend | None = None,
    fetch_options: ApiFetchOptions = ApiFetchOptions()
) -> None
run_async(
    *,
    dry_run: Literal[False],
    storage: StorageBackend | None = None,
    fetch_options: ApiFetchOptions = ApiFetchOptions()
) -> FutureType
run_async(
    *,
    dry_run: bool,
    storage: StorageBackend | None = None,
    fetch_options: ApiFetchOptions = ApiFetchOptions()
) -> FutureType | None

Validate the task and either dry-run or submit it.

Other Parameters:

Name Type Description
dry_run bool

When True, print a summary and return None. When False, submit the task and return a future.

storage StorageBackend | None

Storage backend that will receive the task definition and later fetched shots. When None, a fresh DictStorage is used (in-memory; not persisted across processes). Defaults to None.

fetch_options ApiFetchOptions

Pagination and polling options attached to the returned future. Defaults to ApiFetchOptions().

Returns:

Type Description
FutureType | None

FutureType | None: Future attached to the submitted task when dry_run is False; otherwise None.

Raises:

Type Description
ValueError

If argument or metadata lengths do not match num_subtasks.

Source code in src/bloqade/core/device/task.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
def run_async(
    self,
    *,
    dry_run: bool,
    storage: StorageBackend | None = None,
    fetch_options: ApiFetchOptions = ApiFetchOptions(),
) -> FutureType | None:
    """Validate the task and either dry-run or submit it.

    Keyword Args:
        dry_run (bool): When True, print a summary and return None.
            When False, submit the task and return a future.
        storage (StorageBackend | None): Storage backend that will receive
            the task definition and later fetched shots. When None, a fresh
            `DictStorage` is used (in-memory; not persisted across
            processes). Defaults to None.
        fetch_options (ApiFetchOptions): Pagination and polling options
            attached to the returned future. Defaults to
            `ApiFetchOptions()`.

    Returns:
        FutureType | None: Future attached to the submitted task when
            `dry_run` is False; otherwise None.

    Raises:
        ValueError: If argument or metadata lengths do not match
            `num_subtasks`.
    """
    self.validate_arguments()

    task_def = self.create_task_definition()

    if dry_run:
        print(self.summary())
        return

    return self.submit_task_definition(
        task_definition=task_def,
        storage=storage,
        fetch_options=fetch_options,
    )

serialize_kernel

serialize_kernel(kernel: Method) -> str

Serialize a kernel into a string suitable for the backend.

Parameters:

Name Type Description Default
kernel Method

Kernel to serialize.

required

Returns:

Name Type Description
str str

Serialized kernel content.

Source code in src/bloqade/core/device/task.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def serialize_kernel(self, kernel: ir.Method) -> str:
    """Serialize a kernel into a string suitable for the backend.

    Args:
        kernel (ir.Method): Kernel to serialize.

    Returns:
        str: Serialized kernel content.
    """

    encoded_module = kernel.dialects.encode(
        kernel, version=self.program_language_version
    )
    return json_serializer.encode(encoded_module)

submit_task_definition

submit_task_definition(
    *,
    task_definition: TaskDefinition,
    storage: StorageBackend | None = None,
    fetch_options: ApiFetchOptions = ApiFetchOptions()
) -> FutureType

Submit a prepared task definition and return a future.

Other Parameters:

Name Type Description
task_definition TaskDefinition

Task definition to submit.

storage StorageBackend | None

Storage backend that will receive the task definition. When None, a fresh DictStorage is used (in-memory; not persisted across processes). Defaults to None.

fetch_options ApiFetchOptions

Pagination and polling options attached to the returned future. Defaults to ApiFetchOptions().

Returns:

Name Type Description
FutureType FutureType

Future attached to the created task ID.

Raises:

Type Description
ValueError

If the backend response is missing a task ID.

Source code in src/bloqade/core/device/task.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def submit_task_definition(
    self,
    *,
    task_definition: TaskDefinition,
    storage: StorageBackend | None = None,
    fetch_options: ApiFetchOptions = ApiFetchOptions(),
) -> FutureType:
    """Submit a prepared task definition and return a future.

    Keyword Args:
        task_definition (TaskDefinition): Task definition to submit.
        storage (StorageBackend | None): Storage backend that will receive
            the task definition. When None, a fresh `DictStorage` is used
            (in-memory; not persisted across processes). Defaults to None.
        fetch_options (ApiFetchOptions): Pagination and polling options
            attached to the returned future. Defaults to
            `ApiFetchOptions()`.

    Returns:
        FutureType: Future attached to the created task ID.

    Raises:
        ValueError: If the backend response is missing a task ID.
    """
    if storage is None:
        storage = DictStorage()

    self.authenticate()

    task_request = TaskCreationRequest(root=task_definition)
    with TasksClient(self.app_context) as tasks_client:
        created_task = tasks_client.create(body=task_request)  # type: ignore

    task_id = created_task.id

    if not task_id:
        raise ValueError(
            f"Couldn't get id of created task {created_task}. Please report this issue!"
        )

    logger.info(f"Submitted task with ID: {task_id}")

    storage.add_task_definition(task_id, task_definition, created_task.created_date)

    return self.future_cls(
        task_id=task_id,
        fetch_options=fetch_options,
        storage=storage,
        context_name=self.context_name,
    )

summary

summary() -> str

Return a human-readable summary printed on dry-run.

Returns:

Name Type Description
str str

Summary describing what would be submitted.

Source code in src/bloqade/core/device/task.py
84
85
86
87
88
89
90
def summary(self) -> str:
    """Return a human-readable summary printed on dry-run.

    Returns:
        str: Summary describing what would be submitted.
    """
    return f"Would now submit {self.num_subtasks} subtasks"

validate_arguments

validate_arguments() -> None

Validate that argument and metadata lengths match subtask count.

Raises:

Type Description
ValueError

If arguments or metadata length differs from num_subtasks.

Source code in src/bloqade/core/device/task.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def validate_arguments(self) -> None:
    """Validate that argument and metadata lengths match subtask count.

    Raises:
        ValueError: If arguments or metadata length differs from
            `num_subtasks`.
    """
    arguments = self.get_arguments()
    if arguments is not None and len(arguments) != self.num_subtasks:
        raise ValueError(
            f"Length mismatch: got {len(arguments)} sets of arguments for {self.num_subtasks} subtasks!"
        )

    metadata = self.get_metadata()
    if metadata is not None and len(metadata) != self.num_subtasks:
        raise ValueError(
            f"Length mismatch: got {len(metadata)} sets of metadata for {self.num_subtasks} subtasks!"
        )

    num_shots = self.get_num_shots()
    if len(num_shots) != self.num_subtasks:
        raise ValueError(
            f"Length mismatch: got {len(num_shots)} shot counts for {self.num_subtasks} subtasks!"
        )