Skip to content

Future

ApiFetchOptions dataclass

ApiFetchOptions(
    subtasks_per_fetch: int = 10,
    shots_per_fetch: int = 100,
    poll_interval_initial: float = 0.5,
    poll_interval_max: float = 30.0,
    poll_interval_factor: float = 2.0,
)

Options controlling task polling and result pagination.

Attributes:

Name Type Description
subtasks_per_fetch int

Number of subtasks to request per results page. Defaults to 10.

shots_per_fetch int

Number of shots to request per subtask shot page. Defaults to 100.

poll_interval_initial float

Initial delay, in seconds, between status polls. Defaults to 0.5.

poll_interval_max float

Maximum polling delay, in seconds. Defaults to 30.0.

poll_interval_factor float

Multiplier applied to the polling delay after each non-terminal status. Defaults to 2.0.

Future dataclass

Future(
    *,
    context_name: str,
    task_id: str,
    storage: StorageBackend = DictStorage(),
    fetch_options: ApiFetchOptions = ApiFetchOptions(),
    result_cls: type[ResultType] = Result
)

Bases: AuthMixin, Generic[ResultType]


              flowchart TD
              bloqade.core.device.future.Future[Future]
              bloqade.core.device.mixins.AuthMixin[AuthMixin]

                              bloqade.core.device.mixins.AuthMixin --> bloqade.core.device.future.Future
                


              click bloqade.core.device.future.Future href "" "bloqade.core.device.future.Future"
              click bloqade.core.device.mixins.AuthMixin href "" "bloqade.core.device.mixins.AuthMixin"
            

Future for a submitted task.

A future can poll task status, fetch available shot results into storage, and construct result views over that storage using result_cls.

Attributes:

Name Type Description
task_id str

Backend task ID.

storage StorageBackend

Storage backend used for fetched shots and task metadata. Defaults to a fresh DictStorage (in-memory; not persisted across processes).

fetch_options ApiFetchOptions

Pagination and polling options.

cancel

cancel()

Attempt to cancel the execution of the task.

Returns:

Type Description

The backend cancellation response when cancellation is submitted;

otherwise None if cancellation raises and a warning is emitted.

Source code in src/bloqade/core/device/future.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def cancel(self):
    """Attempt to cancel the execution of the task.

    Returns:
        The backend cancellation response when cancellation is submitted;
        otherwise None if cancellation raises and a warning is emitted.
    """

    self.authenticate()
    with TasksClient(self.app_context) as client:
        try:
            # NOTE: typing issue because client is seen as BaseClient instead of TaskClient
            return client.cancel(id=self.task_id)  # type: ignore
        except Exception as e:
            warn(
                f"Exception encountered when trying to cancel task with ID {self.task_id}: {str(repr(e))}"
            )

cancelled

cancelled() -> bool

Return whether the task was cancelled.

Returns:

Name Type Description
bool bool

True if the task status is CANCELLED.

Source code in src/bloqade/core/device/future.py
178
179
180
181
182
183
184
def cancelled(self) -> bool:
    """Return whether the task was cancelled.

    Returns:
        bool: `True` if the task status is `CANCELLED`.
    """
    return self.status() == TaskStatus.CANCELLED

done

done() -> bool

Return whether the task has reached a terminal status.

Returns:

Name Type Description
bool bool

True if the task status is one of EXIT_STATUS (completed, cancelled, failed, or payload processing error).

Source code in src/bloqade/core/device/future.py
143
144
145
146
147
148
149
150
def done(self) -> bool:
    """Return whether the task has reached a terminal status.

    Returns:
        bool: `True` if the task status is one of `EXIT_STATUS`
            (completed, cancelled, failed, or payload processing error).
    """
    return self.status() in self.EXIT_STATUS

export_to

export_to(
    storage: StorageBackend,
    chunk_size: int = 1000,
    shot_filter: ShotFilter | None = None,
)

Copy stored shots and task definitions to another storage backend.

Parameters:

Name Type Description Default
storage StorageBackend

Destination storage backend.

required
chunk_size int

Maximum number of shots to write per batch. Defaults to 1000.

1000
shot_filter ShotFilter | None

Optional shot filter. When the filter includes task_ids, only those task definitions are copied. Otherwise, all task definitions from this future's storage are copied. Defaults to None.

None
Source code in src/bloqade/core/device/future.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def export_to(
    self,
    storage: StorageBackend,
    chunk_size: int = 1000,
    shot_filter: ShotFilter | None = None,
):
    """Copy stored shots and task definitions to another storage backend.

    Args:
        storage (StorageBackend): Destination storage backend.
        chunk_size (int): Maximum number of shots to write per batch.
            Defaults to 1000.
        shot_filter (ShotFilter | None): Optional shot filter. When the
            filter includes `task_ids`, only those task definitions are
            copied. Otherwise, all task definitions from this future's
            storage are copied. Defaults to None.
    """
    chunk = []
    for shot in self.storage.get_shots(shot_filter=shot_filter):
        chunk.append(shot)
        if len(chunk) >= chunk_size:
            storage.add_shots(chunk)
            chunk = []
    if chunk:
        storage.add_shots(chunk)

    if shot_filter is not None and shot_filter.task_ids is not None:
        task_ids = shot_filter.task_ids
    else:
        task_ids = self.storage.task_ids()

    for task_id in task_ids:
        task_def = self.storage.get_task_definition(task_id)
        creation_time = self.storage.get_task_creation_time(task_id)
        storage.add_task_definition(task_id, task_def, creation_time)

fetch

fetch() -> None

Fetch currently available shot results into this future's storage.

Results are requested from the first known incomplete subtask page and paginated by both subtask and shot page. Repeated calls are safe because storage backends de-duplicate rows by task ID, shot index, and frame type.

Source code in src/bloqade/core/device/future.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def fetch(self) -> None:
    """Fetch currently available shot results into this future's storage.

    Results are requested from the first known incomplete subtask page and
    paginated by both subtask and shot page. Repeated calls are safe because
    storage backends de-duplicate rows by task ID, shot index, and frame
    type.
    """
    self.authenticate()
    subtask_page = self._first_incomplete_subtask_page
    done = False

    with ResultsClient(self.app_context) as client:
        while not done:
            done = self._fetch_subtask_page(
                client=client,  # type: ignore
                subtask_page=subtask_page,
            )

            subtask_page += 1

fetch_and_export_to

fetch_and_export_to(
    storage: StorageBackend, chunk_size: int = 1000
)

Fetch available results and export this future's storage.

Parameters:

Name Type Description Default
storage StorageBackend

Destination storage backend.

required
chunk_size int

Maximum number of shots to write per batch. Defaults to 1000.

1000
Source code in src/bloqade/core/device/future.py
278
279
280
281
282
283
284
285
286
287
def fetch_and_export_to(self, storage: StorageBackend, chunk_size: int = 1000):
    """Fetch available results and export this future's storage.

    Args:
        storage (StorageBackend): Destination storage backend.
        chunk_size (int): Maximum number of shots to write per batch.
            Defaults to 1000.
    """
    self.fetch()
    self.export_to(storage, chunk_size=chunk_size)

from_storage classmethod

from_storage(
    *,
    storage: StorageBackend,
    new_storage: StorageBackend | None = None,
    task_id: str | None = None,
    fetch_options: ApiFetchOptions = ApiFetchOptions(),
    context_name: str | None = None
) -> Self

Create a future from task metadata already present in storage.

Parameters:

Name Type Description Default
storage StorageBackend

Storage used to discover and validate the task ID.

required
new_storage StorageBackend | None

Storage backend to attach to the returned future. When None, storage is reused. Defaults to None.

None
task_id str | None

Task ID to attach to. Required when storage contains more than one task ID. Defaults to None.

None
fetch_options ApiFetchOptions

Pagination and polling options. Defaults to ApiFetchOptions().

ApiFetchOptions()
context_name str | None

Name of the qlam context to attach to the returned future. When None, the class-level default on cls is used. Defaults to None.

None

Returns:

Name Type Description
Self Self

A future attached to the selected task ID.

Raises:

Type Description
ValueError

If storage has no task IDs, the requested task ID is not present, multiple task IDs are present without an explicit task_id, or context_name is None and cls has no class-level default.

Source code in src/bloqade/core/device/future.py
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
@classmethod
def from_storage(
    cls,
    *,
    storage: StorageBackend,
    new_storage: StorageBackend | None = None,
    task_id: str | None = None,
    fetch_options: ApiFetchOptions = ApiFetchOptions(),
    context_name: str | None = None,
) -> Self:
    """Create a future from task metadata already present in storage.

    Args:
        storage (StorageBackend): Storage used to discover and validate the
            task ID.
        new_storage (StorageBackend | None): Storage backend to attach to
            the returned future. When None, `storage` is reused. Defaults to
            None.
        task_id (str | None): Task ID to attach to. Required when `storage`
            contains more than one task ID. Defaults to None.
        fetch_options (ApiFetchOptions): Pagination and polling options.
            Defaults to `ApiFetchOptions()`.
        context_name (str | None): Name of the qlam context to attach to
            the returned future. When None, the class-level default on
            `cls` is used. Defaults to None.

    Returns:
        Self: A future attached to the selected task ID.

    Raises:
        ValueError: If storage has no task IDs, the requested task ID is not
            present, multiple task IDs are present without an explicit
            `task_id`, or `context_name` is None and `cls` has no
            class-level default.
    """
    context_name = cls._resolve_context_name(context_name)

    if new_storage is None:
        new_storage = storage

    task_ids_at_storage = storage.task_ids()

    if not task_ids_at_storage:
        raise ValueError("Found no task IDs in storage.")

    if task_id is not None and task_id not in task_ids_at_storage:
        raise ValueError(f"Task with ID {task_id} not found at storage {storage}")

    if len(task_ids_at_storage) > 1 and task_id is None:
        msg = "More than one task ID found! Please specify a task_id. Candidates are:\n"
        for candidate_task_id in task_ids_at_storage:
            creation_time = storage.get_task_creation_time(candidate_task_id)
            msg += f"  * {candidate_task_id}, created at {creation_time}\n"
        raise ValueError(msg)

    if task_id is None:
        task_id = task_ids_at_storage.pop()

    return cls(
        task_id=task_id,
        storage=new_storage,
        fetch_options=fetch_options,
        result_cls=cls.result_cls,
        context_name=context_name,
    )

from_task_id classmethod

from_task_id(
    *,
    task_id: str,
    storage: StorageBackend | None = None,
    fetch_options: ApiFetchOptions = ApiFetchOptions(),
    context_name: str | None = None
) -> Self

Create a future from a backend task ID.

The task record and task definition are fetched from the backend, and the task definition is stored in storage before the future is returned.

Parameters:

Name Type Description Default
task_id str

Backend task ID.

required
storage StorageBackend | None

Storage backend that will receive the task definition and later fetched shots. When None, a fresh DictStorage is used (in-memory; not persisted across processes). Defaults to None.

None
fetch_options ApiFetchOptions

Pagination and polling options. Defaults to ApiFetchOptions().

ApiFetchOptions()
context_name str | None

Name of the qlam context used to fetch the task and attached to the returned future. When None, the class-level default on cls is used. Defaults to None.

None

Returns:

Name Type Description
Self Self

A future attached to task_id.

Raises:

Type Description
ValueError

If context_name is None and cls has no class-level default.

Source code in src/bloqade/core/device/future.py
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
@classmethod
def from_task_id(
    cls,
    *,
    task_id: str,
    storage: StorageBackend | None = None,
    fetch_options: ApiFetchOptions = ApiFetchOptions(),
    context_name: str | None = None,
) -> Self:
    """Create a future from a backend task ID.

    The task record and task definition are fetched from the backend, and
    the task definition is stored in `storage` before the future is
    returned.

    Args:
        task_id (str): Backend task ID.
        storage (StorageBackend | None): Storage backend that will receive
            the task definition and later fetched shots. When None, a fresh
            `DictStorage` is used (in-memory; not persisted across
            processes). Defaults to None.
        fetch_options (ApiFetchOptions): Pagination and polling options.
            Defaults to `ApiFetchOptions()`.
        context_name (str | None): Name of the qlam context used to fetch
            the task and attached to the returned future. When None, the
            class-level default on `cls` is used. Defaults to None.

    Returns:
        Self: A future attached to `task_id`.

    Raises:
        ValueError: If `context_name` is None and `cls` has no
            class-level default.
    """
    if storage is None:
        storage = DictStorage()

    context_name = cls._resolve_context_name(context_name)
    auth = AuthMixin(context_name=context_name)
    auth.authenticate()
    with TasksClient(auth.app_context) as client:
        task = client.get(id=task_id)  # type: ignore

    # fetch subtasks for metadata
    with DefinitionsClient(auth.app_context) as client:
        task_def = client.get(id=task.definition_id)  # type: ignore

    storage.add_task_definition(
        task_id, task_definition=task_def, creation_time=task.created_date
    )

    return cls(
        task_id=task_id,
        storage=storage,
        fetch_options=fetch_options,
        result_cls=cls.result_cls,
        context_name=context_name,
    )

get_compilation

get_compilation(compilation_id: str | None = None)

Fetch the compilation record associated with this task.

Parameters:

Name Type Description Default
compilation_id str | None

ID of the compilation to fetch. When None, the compilation ID is retrieved from the task record. Defaults to None.

None

Returns:

Type Description

The compilation object returned by the backend.

Source code in src/bloqade/core/device/future.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def get_compilation(self, compilation_id: str | None = None):
    """Fetch the compilation record associated with this task.

    Args:
        compilation_id (str | None): ID of the compilation to fetch. When
            `None`, the compilation ID is retrieved from the task record.
            Defaults to `None`.

    Returns:
        The compilation object returned by the backend.
    """
    self.authenticate()

    if compilation_id is None:
        compilation_id = self.get_task().compilation_id

    with CompilationsClient(self.app_context) as client:
        return client.get(id=compilation_id)  # type: ignore

get_task

get_task() -> Task

Fetch the current task record from the backend.

Returns:

Name Type Description
Task Task

The task object returned by the backend.

Source code in src/bloqade/core/device/future.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def get_task(self) -> "Task":
    """Fetch the current task record from the backend.

    Returns:
        Task: The task object returned by the backend.
    """
    self.authenticate()
    with TasksClient(self.app_context) as client:
        # NOTE: typing issue in qlam-core
        # every client is BaseRestApi, which doesn't have get, but it actually does
        task = client.get(id=self.task_id)  # type: ignore
        logger.info(
            f"Fetched task with id {self.task_id}. Current status: {task.task_status}"
        )
        return task

partial_result

partial_result() -> ResultType

Fetch currently available results and return a result view.

Unlike result, this method does not wait for the task to finish.

Returns:

Name Type Description
ResultType ResultType

A result view scoped to this future's task ID and the DETECTED frame type.

Source code in src/bloqade/core/device/future.py
212
213
214
215
216
217
218
219
220
221
222
def partial_result(self) -> ResultType:
    """Fetch currently available results and return a result view.

    Unlike `result`, this method does not wait for the task to finish.

    Returns:
        ResultType: A result view scoped to this future's task ID and the
            DETECTED frame type.
    """
    self.fetch()
    return self.results_from_storage()

result

result(*, timeout: float | None = None) -> ResultType

Wait for completion, fetch results, and return a result view.

Other Parameters:

Name Type Description
timeout float | None

Maximum number of seconds to wait for a terminal task status. If None, wait indefinitely. Defaults to None.

Returns:

Name Type Description
ResultType ResultType

A result view scoped to this future's task ID and the DETECTED frame type.

Raises:

Type Description
TimeoutError

If timeout elapses before a terminal status is reached.

ValueError

If the task was cancelled or failed.

Source code in src/bloqade/core/device/future.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def result(
    self,
    *,
    timeout: float | None = None,
) -> ResultType:
    """Wait for completion, fetch results, and return a result view.

    Keyword Args:
        timeout (float | None): Maximum number of seconds to wait for a
            terminal task status. If None, wait indefinitely. Defaults to
            None.

    Returns:
        ResultType: A result view scoped to this future's task ID and the
            DETECTED frame type.

    Raises:
        TimeoutError: If `timeout` elapses before a terminal status is
            reached.
        ValueError: If the task was cancelled or failed.
    """
    self._wait_for_completion(timeout=timeout)
    self.fetch()

    return self.results_from_storage()

results_from_storage

results_from_storage(
    shot_filter: ShotFilter | None = None,
) -> ResultType

Build a result view over this future's storage.

Parameters:

Name Type Description Default
shot_filter ShotFilter | None

Filter to apply to the result view. When None, the view is scoped to this future's task ID and the DETECTED frame type. Defaults to None.

None

Returns:

Name Type Description
ResultType ResultType

A result view over the storage backend.

Source code in src/bloqade/core/device/future.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def results_from_storage(self, shot_filter: ShotFilter | None = None) -> ResultType:
    """Build a result view over this future's storage.

    Args:
        shot_filter (ShotFilter | None): Filter to apply to the result view.
            When None, the view is scoped to this future's task ID and the
            DETECTED frame type. Defaults to None.

    Returns:
        ResultType: A result view over the storage backend.
    """
    if shot_filter is None:
        shot_filter = ShotFilter(task_ids=(self.task_id,), frame_type="DETECTED")
    return self.result_cls(
        storage=self.storage,
        shot_filter=shot_filter,
    )

status

status() -> TaskStatus

Return the current status of the task.

Returns:

Name Type Description
TaskStatus TaskStatus

The task status as reported by the backend.

Source code in src/bloqade/core/device/future.py
152
153
154
155
156
157
158
def status(self) -> TaskStatus:
    """Return the current status of the task.

    Returns:
        TaskStatus: The task status as reported by the backend.
    """
    return self.get_task().task_status