Skip to content

Exclusive

ExclusiveRemoteTask dataclass

ExclusiveRemoteTask(
    _task_ir,
    _metadata,
    _parallel_decoder,
    _http_handler=HTTPHandler(),
    _task_id=None,
    _task_result_ir=None,
)

Bases: CustomRemoteTaskABC

pull

pull(poll_interval=20)

Blocking pull to get the task result. poll_interval is the time interval to poll the task status. Please ensure that it is relatively large, otherwise the server could get overloaded with queries.

Source code in src/bloqade/analog/task/exclusive.py
def pull(self, poll_interval: float = 20):
    """
    Blocking pull to get the task result.
    poll_interval is the time interval to poll the task status.
    Please ensure that it is relatively large, otherwise
    the server could get overloaded with queries.
    """

    while True:
        if self._task_result_ir.task_status is QuEraTaskStatusCode.Unsubmitted:
            raise ValueError("Task ID not found.")

        if self._task_result_ir.task_status in [
            QuEraTaskStatusCode.Completed,
            QuEraTaskStatusCode.Partial,
            QuEraTaskStatusCode.Failed,
            QuEraTaskStatusCode.Unaccepted,
            QuEraTaskStatusCode.Cancelled,
        ]:
            return self

        status = self.status()
        if status in [QuEraTaskStatusCode.Completed, QuEraTaskStatusCode.Partial]:
            self._task_result_ir = self._http_handler.fetch_results(self._task_id)
            return self

        time.sleep(poll_interval)

HTTPHandlerABC

fetch_results abstractmethod

fetch_results(task_id)

Fetch the task results from the AirTable.

Parameters:

Name Type Description Default
task_id str

The task id to be queried.

required

returns response: The response from the AirTable. used for error handling

Source code in src/bloqade/analog/task/exclusive.py
@abc.abstractmethod
def fetch_results(task_id: str):
    """Fetch the task results from the AirTable.

    args:
        task_id: The task id to be queried.

    returns
        response: The response from the AirTable. used for error handling

    """

    ...

query_task_status abstractmethod

query_task_status(task_id)

Query the task status from the AirTable.

Parameters:

Name Type Description Default
task_id str

The task id to be queried.

required

returns response: The response from the AirTable. used for error handling

Source code in src/bloqade/analog/task/exclusive.py
@abc.abstractmethod
def query_task_status(task_id: str):
    """Query the task status from the AirTable.

    args:
        task_id: The task id to be queried.

    returns
        response: The response from the AirTable. used for error handling

    """
    ...

submit_task_via_zapier abstractmethod

submit_task_via_zapier(task_ir, task_id)

Submit a task and add task_id to the task fields for querying later.

Parameters:

Name Type Description Default
task_ir QuEraTaskSpecification

The task to be submitted.

required
task_id str

The task id to be added to the task fields.

required

returns response: The response from the Zapier webhook. used for error handling

Source code in src/bloqade/analog/task/exclusive.py
@abc.abstractmethod
def submit_task_via_zapier(task_ir: QuEraTaskSpecification, task_id: str):
    """Submit a task and add task_id to the task fields for querying later.

    args:
        task_ir: The task to be submitted.
        task_id: The task id to be added to the task fields.

    returns
        response: The response from the Zapier webhook. used for error handling

    """
    ...