Module quera_ahs_utils.parallelize
Expand source code
from itertools import product
import numpy as np
from typing import Tuple,Union,NoReturn,Optional
from decimal import Decimal
from braket.aws import AwsDevice
from braket.ahs.analog_hamiltonian_simulation import AnalogHamiltonianSimulation,DrivingField
from braket.ahs.field import Field
from braket.task_result import AnalogHamiltonianSimulationTaskResult
from braket.ahs.atom_arrangement import AtomArrangement
from braket.ahs.pattern import Pattern
__all__ = [
'generate_parallel_register',
'parallelize_field',
'parallelize_hamiltonian',
'parallelize_ahs',
'get_shots_braket_sdk_results',
'parallelize_quera_json',
'get_shots_quera_results',
]
def generate_parallel_register(
register:AtomArrangement,
qpu:AwsDevice,
interproblem_distance:Union[float,Decimal]
) -> Tuple[AtomArrangement,dict]:
"""generate grid of parallel registers from a single register.
Args:
register (AtomArrangement): The register set to parallelize
qpu (AwsDevice): The QPU that will be running the parallel job
interproblem_distance (UnionType[float,Decimal]): _description_
Returns:
_type_: _description_
"""
x_min = min(*[site.coordinate[0] for site in register])
x_max = max(*[site.coordinate[0] for site in register])
y_min = min(*[site.coordinate[1] for site in register])
y_max = max(*[site.coordinate[1] for site in register])
single_problem_width = x_max - x_min
single_problem_height = y_max - y_min
# get values from device capabilities
field_of_view_width = qpu.properties.paradigm.lattice.area.width
field_of_view_height = qpu.properties.paradigm.lattice.area.height
n_site_max = qpu.properties.paradigm.lattice.geometry.numberSitesMax
# setting up a grid of problems filling the total area
n_width = int(float(field_of_view_width) // (single_problem_width + interproblem_distance))
n_height = int(float(field_of_view_height) // (single_problem_height + interproblem_distance))
batch_mapping = dict()
parallel_register = AtomArrangement()
atom_number = 0 #counting number of atoms added
for ix in range(n_width):
x_shift = ix * (single_problem_width + interproblem_distance)
for iy in range(n_height):
y_shift = iy * (single_problem_height + interproblem_distance)
# reached the maximum number of batches possible given n_site_max
if atom_number + len(register) > n_site_max: break
atoms = []
for site in register:
new_coordinate = (x_shift + site.coordinate[0], y_shift + site.coordinate[1])
parallel_register.add(new_coordinate,site.site_type)
atoms.append(atom_number)
atom_number += 1
batch_mapping[(ix,iy)] = atoms
return parallel_register,batch_mapping
def parallelize_field(
field:Field,
batch_mapping:dict
) -> Field:
"""Generate parallel field from a batch_mapping
Args:
field (Field): the field to parallelize
batch_mapping (dict): the mapping that describes the parallelization
Raises:
NotImplementedError: currently not supporting local detuning.
Returns:
Field: the new field that works for the parallel program.
"""
if field.pattern == None:
return field
else:
raise NotImplementedError("Non-uniform pattern note supported in parallelization")
def parallelize_hamiltonian(
driving_field: DrivingField,
batch_mapping:dict
) -> DrivingField:
"""Generate the parallel driving fields from a batch_mapping.
Args:
driving_field (DrivingField): The fields to parallelize
batch_mapping (dict): the mapping that generates the parallelization
Returns:
DrivingField: the parallelized driving field.
"""
return DrivingField(
amplitude=parallelize_field(driving_field.amplitude,batch_mapping),
phase=parallelize_field(driving_field.phase,batch_mapping),
detuning=parallelize_field(driving_field.detuning,batch_mapping)
)
def parallelize_ahs(
ahs:AnalogHamiltonianSimulation,
qpu: AwsDevice,
interproblem_distance: Union[float,Decimal]
) -> AnalogHamiltonianSimulation:
"""Generate parallel ahs program.
Args:
ahs (AnalogHamiltonianSimulation): The program to parallelize
qpu (AwsDevice): The device to run the parallel jobs
interproblem_distance (float, Decimal): The distance between the programs.
Returns:
AnalogHamiltonianSimulation: The new parallel program ready to run.
"""
parallel_register,batch_mapping = generate_parallel_register(ahs.register,qpu,interproblem_distance)
parallel_program = AnalogHamiltonianSimulation(
register=parallel_register,
hamiltonian=parallelize_hamiltonian(ahs.hamiltonian,batch_mapping)
)
return parallel_program,batch_mapping
def get_shots_braket_sdk_results(
results: AnalogHamiltonianSimulationTaskResult,
batch_mapping:Optional[dict]=None,
post_select:Optional[bool]=True
)-> np.array:
"""get the shot results from a braket-sdk task results type.
Args:
results (AnalogHamiltonianSimulationTaskResult):
The task results to process.
batch_mapping (Optional[dict], optional):
The parallel mapping generated from some parallelization. Defaults to None.
post_select (bool, optional):
Post select if atom fails to be sorted in the shot results. Defaults to True.
Returns:
np.array: The shot results stored as 1,0 in the rows of the array.
1 is the rydberg state and 0 is the ground state.
"""
# collecting QPU Data
has_defects = lambda x: np.any(x==0) if post_select else True
if batch_mapping == None:
all_sequences = [m.post_sequence for m in results.measurements if has_defects(m.pre_sequence)]
else:
all_sequences = [m.post_sequence[inds] for m,inds in product(results.measurements,batch_mapping.values()) if has_defects(m.pre_sequence[inds])]
return np.array(all_sequences)
def parallelize_quera_json(
input_json: dict,
interproblem_distance:float,
qpu_width:float,
qpu_height:float,
n_site_max:int
) -> Tuple[dict,dict]:
"""Generate a parallel QuEra json program from a single program.
Args:
input_json (dict): The input program to parallelize
interproblem_distance (float): The distance between parallel problems
qpu_width (float): The field of view width for the program
qpu_height (float): The field of view height for the program
n_site_max (int): Maximum number of sites allowed for a program.
Raises:
NotImplementedError: local detuning currently not supported.
Returns:
Tuple[dict,dict]: first element is the parallelized program as a dict.
The second element of the tuple is the batch mapping for post processing.
"""
lattice = input_json["lattice"]
if "local" in input_json["effective_hamiltonian"]["rydberg"]["detuning"]:
raise NotImplementedError("local detuning not supported in this function")
else:
output_json = {}
output_json.update(input_json)
sites = lattice["sites"]
filling = lattice["filling"]
if len(sites) > 1:
xmin = min(*[site[0] for site in sites])
xmax = max(*[site[0] for site in sites])
ymin = min(*[site[1] for site in sites])
ymax = max(*[site[1] for site in sites])
single_problem_width = xmax-xmin
single_problem_height = ymax-ymin
else:
single_problem_width = 0
single_problem_height = 0
n_width = int(qpu_width // (single_problem_width + interproblem_distance))
n_height = int(qpu_height // (single_problem_height + interproblem_distance))
parallel_sites = []
parallel_filling = []
atom_number = 0 #counting number of atoms added
batch_mapping = {}
for ix in range(n_width):
x_shift = np.around(ix * (single_problem_width + interproblem_distance),14)
for iy in range(n_height):
y_shift = np.around(iy * (single_problem_height + interproblem_distance),14)
# reached the maximum number of batches possible given n_site_max
if atom_number + len(sites) > n_site_max: break
atoms = []
for site,fill in zip(sites,filling):
new_site = [x_shift + site[0], y_shift + site[1]]
parallel_sites.append(new_site)
parallel_filling.append(fill)
atoms.append(atom_number)
atom_number += 1
batch_mapping[f"({ix},{iy})"] = atoms
output_json["lattice"]["sites"] = parallel_sites
output_json["lattice"]["filling"] = parallel_filling
return output_json,batch_mapping
def get_shots_quera_results(
results_json: dict,
batch_mapping:Optional[dict]=None,
post_select:Optional[bool]=True
) -> np.array:
"""Get the shots out of a QuEra programming
Args:
results_json (dict): _description_
batch_mapping (dict, optional):
The parallel mapping generated from some parallelization. Defaults to None.
post_select (bool, optional):
Post select if atom fails to be sorted in the shot results. Defaults to True.
Returns:
np.array: The shot results stored as 1,0 in the rows of the array.
1 is the rydberg state and 0 is the ground state.
"""
# collecting QPU Data
no_defects = lambda bits: np.all(bits==1) if post_select else True
shots_list = results_json["shot_outputs"]
pre_and_post = [(np.array(m["pre_sequence"]),np.array(m["post_sequence"])) for m in shots_list if m["shot_status_code"]==200]
if batch_mapping == None:
all_sequences = [post for pre,post in pre_and_post if no_defects(pre)]
else:
all_sequences = [post[inds] for (pre,post),inds in product(pre_and_post,batch_mapping.values()) if no_defects(pre[inds])]
return np.array(all_sequences)
Functions
def generate_parallel_register(register: braket.ahs.atom_arrangement.AtomArrangement, qpu: braket.aws.aws_device.AwsDevice, interproblem_distance: Union[float, decimal.Decimal]) -> Tuple[braket.ahs.atom_arrangement.AtomArrangement, dict]
-
generate grid of parallel registers from a single register.
Args
register
:AtomArrangement
- The register set to parallelize
qpu
:AwsDevice
- The QPU that will be running the parallel job
interproblem_distance
:UnionType[float,Decimal]
- description
Returns
_type_
- description
Expand source code
def generate_parallel_register( register:AtomArrangement, qpu:AwsDevice, interproblem_distance:Union[float,Decimal] ) -> Tuple[AtomArrangement,dict]: """generate grid of parallel registers from a single register. Args: register (AtomArrangement): The register set to parallelize qpu (AwsDevice): The QPU that will be running the parallel job interproblem_distance (UnionType[float,Decimal]): _description_ Returns: _type_: _description_ """ x_min = min(*[site.coordinate[0] for site in register]) x_max = max(*[site.coordinate[0] for site in register]) y_min = min(*[site.coordinate[1] for site in register]) y_max = max(*[site.coordinate[1] for site in register]) single_problem_width = x_max - x_min single_problem_height = y_max - y_min # get values from device capabilities field_of_view_width = qpu.properties.paradigm.lattice.area.width field_of_view_height = qpu.properties.paradigm.lattice.area.height n_site_max = qpu.properties.paradigm.lattice.geometry.numberSitesMax # setting up a grid of problems filling the total area n_width = int(float(field_of_view_width) // (single_problem_width + interproblem_distance)) n_height = int(float(field_of_view_height) // (single_problem_height + interproblem_distance)) batch_mapping = dict() parallel_register = AtomArrangement() atom_number = 0 #counting number of atoms added for ix in range(n_width): x_shift = ix * (single_problem_width + interproblem_distance) for iy in range(n_height): y_shift = iy * (single_problem_height + interproblem_distance) # reached the maximum number of batches possible given n_site_max if atom_number + len(register) > n_site_max: break atoms = [] for site in register: new_coordinate = (x_shift + site.coordinate[0], y_shift + site.coordinate[1]) parallel_register.add(new_coordinate,site.site_type) atoms.append(atom_number) atom_number += 1 batch_mapping[(ix,iy)] = atoms return parallel_register,batch_mapping
def get_shots_braket_sdk_results(results: braket.task_result.analog_hamiltonian_simulation_task_result_v1.AnalogHamiltonianSimulationTaskResult, batch_mapping: Optional[dict] = None, post_select: Optional[bool] = True) ->
-
get the shot results from a braket-sdk task results type.
Args
results
:AnalogHamiltonianSimulationTaskResult
- The task results to process.
batch_mapping
:Optional[dict]
, optional- The parallel mapping generated from some parallelization. Defaults to None.
post_select
:bool
, optional
Post select if atom fails to be sorted in the shot results. Defaults to True.
Returns
np.array
- The shot results stored as 1,0 in the rows of the array.
1 is the rydberg state and 0 is the ground state.
Expand source code
def get_shots_braket_sdk_results( results: AnalogHamiltonianSimulationTaskResult, batch_mapping:Optional[dict]=None, post_select:Optional[bool]=True )-> np.array: """get the shot results from a braket-sdk task results type. Args: results (AnalogHamiltonianSimulationTaskResult): The task results to process. batch_mapping (Optional[dict], optional): The parallel mapping generated from some parallelization. Defaults to None. post_select (bool, optional): Post select if atom fails to be sorted in the shot results. Defaults to True. Returns: np.array: The shot results stored as 1,0 in the rows of the array. 1 is the rydberg state and 0 is the ground state. """ # collecting QPU Data has_defects = lambda x: np.any(x==0) if post_select else True if batch_mapping == None: all_sequences = [m.post_sequence for m in results.measurements if has_defects(m.pre_sequence)] else: all_sequences = [m.post_sequence[inds] for m,inds in product(results.measurements,batch_mapping.values()) if has_defects(m.pre_sequence[inds])] return np.array(all_sequences)
def get_shots_quera_results(results_json: dict, batch_mapping: Optional[dict] = None, post_select: Optional[bool] = True) ->
-
Get the shots out of a QuEra programming
Args
results_json
:dict
- description
batch_mapping
:dict
, optional- The parallel mapping generated from some parallelization. Defaults to None.
post_select
:bool
, optional
Post select if atom fails to be sorted in the shot results. Defaults to True.
Returns
np.array
- The shot results stored as 1,0 in the rows of the array.
1 is the rydberg state and 0 is the ground state.
Expand source code
def get_shots_quera_results( results_json: dict, batch_mapping:Optional[dict]=None, post_select:Optional[bool]=True ) -> np.array: """Get the shots out of a QuEra programming Args: results_json (dict): _description_ batch_mapping (dict, optional): The parallel mapping generated from some parallelization. Defaults to None. post_select (bool, optional): Post select if atom fails to be sorted in the shot results. Defaults to True. Returns: np.array: The shot results stored as 1,0 in the rows of the array. 1 is the rydberg state and 0 is the ground state. """ # collecting QPU Data no_defects = lambda bits: np.all(bits==1) if post_select else True shots_list = results_json["shot_outputs"] pre_and_post = [(np.array(m["pre_sequence"]),np.array(m["post_sequence"])) for m in shots_list if m["shot_status_code"]==200] if batch_mapping == None: all_sequences = [post for pre,post in pre_and_post if no_defects(pre)] else: all_sequences = [post[inds] for (pre,post),inds in product(pre_and_post,batch_mapping.values()) if no_defects(pre[inds])] return np.array(all_sequences)
def parallelize_ahs(ahs: braket.ahs.analog_hamiltonian_simulation.AnalogHamiltonianSimulation, qpu: braket.aws.aws_device.AwsDevice, interproblem_distance: Union[float, decimal.Decimal]) -> braket.ahs.analog_hamiltonian_simulation.AnalogHamiltonianSimulation
-
Generate parallel ahs program.
Args
ahs
:AnalogHamiltonianSimulation
- The program to parallelize
qpu
:AwsDevice
- The device to run the parallel jobs
interproblem_distance
:float, Decimal
- The distance between the programs.
Returns
AnalogHamiltonianSimulation
- The new parallel program ready to run.
Expand source code
def parallelize_ahs( ahs:AnalogHamiltonianSimulation, qpu: AwsDevice, interproblem_distance: Union[float,Decimal] ) -> AnalogHamiltonianSimulation: """Generate parallel ahs program. Args: ahs (AnalogHamiltonianSimulation): The program to parallelize qpu (AwsDevice): The device to run the parallel jobs interproblem_distance (float, Decimal): The distance between the programs. Returns: AnalogHamiltonianSimulation: The new parallel program ready to run. """ parallel_register,batch_mapping = generate_parallel_register(ahs.register,qpu,interproblem_distance) parallel_program = AnalogHamiltonianSimulation( register=parallel_register, hamiltonian=parallelize_hamiltonian(ahs.hamiltonian,batch_mapping) ) return parallel_program,batch_mapping
def parallelize_field(field: braket.ahs.field.Field, batch_mapping: dict) -> braket.ahs.field.Field
-
Generate parallel field from a batch_mapping
Args
field
:Field
- the field to parallelize
batch_mapping
:dict
- the mapping that describes the parallelization
Raises
NotImplementedError
- currently not supporting local detuning.
Returns
Field
- the new field that works for the parallel program.
Expand source code
def parallelize_field( field:Field, batch_mapping:dict ) -> Field: """Generate parallel field from a batch_mapping Args: field (Field): the field to parallelize batch_mapping (dict): the mapping that describes the parallelization Raises: NotImplementedError: currently not supporting local detuning. Returns: Field: the new field that works for the parallel program. """ if field.pattern == None: return field else: raise NotImplementedError("Non-uniform pattern note supported in parallelization")
def parallelize_hamiltonian(driving_field: braket.ahs.driving_field.DrivingField, batch_mapping: dict) -> braket.ahs.driving_field.DrivingField
-
Generate the parallel driving fields from a batch_mapping.
Args
driving_field
:DrivingField
- The fields to parallelize
batch_mapping
:dict
- the mapping that generates the parallelization
Returns
DrivingField
- the parallelized driving field.
Expand source code
def parallelize_hamiltonian( driving_field: DrivingField, batch_mapping:dict ) -> DrivingField: """Generate the parallel driving fields from a batch_mapping. Args: driving_field (DrivingField): The fields to parallelize batch_mapping (dict): the mapping that generates the parallelization Returns: DrivingField: the parallelized driving field. """ return DrivingField( amplitude=parallelize_field(driving_field.amplitude,batch_mapping), phase=parallelize_field(driving_field.phase,batch_mapping), detuning=parallelize_field(driving_field.detuning,batch_mapping) )
def parallelize_quera_json(input_json: dict, interproblem_distance: float, qpu_width: float, qpu_height: float, n_site_max: int) -> Tuple[dict, dict]
-
Generate a parallel QuEra json program from a single program.
Args
input_json
:dict
- The input program to parallelize
interproblem_distance
:float
- The distance between parallel problems
qpu_width
:float
- The field of view width for the program
qpu_height
:float
- The field of view height for the program
n_site_max
:int
- Maximum number of sites allowed for a program.
Raises
NotImplementedError
- local detuning currently not supported.
Returns
Tuple[dict,dict]
- first element is the parallelized program as a dict.
The second element of the tuple is the batch mapping for post processing.
Expand source code
def parallelize_quera_json( input_json: dict, interproblem_distance:float, qpu_width:float, qpu_height:float, n_site_max:int ) -> Tuple[dict,dict]: """Generate a parallel QuEra json program from a single program. Args: input_json (dict): The input program to parallelize interproblem_distance (float): The distance between parallel problems qpu_width (float): The field of view width for the program qpu_height (float): The field of view height for the program n_site_max (int): Maximum number of sites allowed for a program. Raises: NotImplementedError: local detuning currently not supported. Returns: Tuple[dict,dict]: first element is the parallelized program as a dict. The second element of the tuple is the batch mapping for post processing. """ lattice = input_json["lattice"] if "local" in input_json["effective_hamiltonian"]["rydberg"]["detuning"]: raise NotImplementedError("local detuning not supported in this function") else: output_json = {} output_json.update(input_json) sites = lattice["sites"] filling = lattice["filling"] if len(sites) > 1: xmin = min(*[site[0] for site in sites]) xmax = max(*[site[0] for site in sites]) ymin = min(*[site[1] for site in sites]) ymax = max(*[site[1] for site in sites]) single_problem_width = xmax-xmin single_problem_height = ymax-ymin else: single_problem_width = 0 single_problem_height = 0 n_width = int(qpu_width // (single_problem_width + interproblem_distance)) n_height = int(qpu_height // (single_problem_height + interproblem_distance)) parallel_sites = [] parallel_filling = [] atom_number = 0 #counting number of atoms added batch_mapping = {} for ix in range(n_width): x_shift = np.around(ix * (single_problem_width + interproblem_distance),14) for iy in range(n_height): y_shift = np.around(iy * (single_problem_height + interproblem_distance),14) # reached the maximum number of batches possible given n_site_max if atom_number + len(sites) > n_site_max: break atoms = [] for site,fill in zip(sites,filling): new_site = [x_shift + site[0], y_shift + site[1]] parallel_sites.append(new_site) parallel_filling.append(fill) atoms.append(atom_number) atom_number += 1 batch_mapping[f"({ix},{iy})"] = atoms output_json["lattice"]["sites"] = parallel_sites output_json["lattice"]["filling"] = parallel_filling return output_json,batch_mapping