Source code for yastn.tn.fpeps.envs._env_boundary_mps

# Copyright 2024 The YASTN Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
from itertools import accumulate
from .... import Tensor, YastnError
from ... import mps
from ._env_auxlliary import identity_tm_boundary


[docs] class EnvBoundaryMps: r""" Boundary MPS class for finite PEPS contraction. """ def __init__(self, psi, opts_svd, setup='l', opts_var=None): self.psi = psi self._env = {} li, ri = 0, psi.Ny-1 ti, bi = 0, psi.Nx-1 if 'l' in setup or 'r' in setup: tmpo = psi.transfer_mpo(n=ri, dirn='v') self._env['r', ri] = identity_tm_boundary(tmpo) tmpo = psi.transfer_mpo(n=li, dirn='v').H self._env['l', li] = identity_tm_boundary(tmpo) if 'b' in setup or 't' in setup: tmpo = psi.transfer_mpo(n=ti, dirn='h') self._env['t', ti] = identity_tm_boundary(tmpo) tmpo = psi.transfer_mpo(n=bi, dirn='h').H self._env['b', bi] = identity_tm_boundary(tmpo) self.info = {} if opts_var == None: opts_var = {'max_sweeps': 2, 'normalize': False,} if 'r' in setup: for ny in range(ri-1, li-1, -1): tmpo = psi.transfer_mpo(n=ny+1, dirn='v') phi0 = self._env['r', ny+1] self._env['r', ny], discarded = mps.zipper(tmpo, phi0, opts_svd, return_discarded=True) mps.compression_(self._env['r', ny], (tmpo, phi0), **opts_var) self.info['r', ny] = {'discarded': discarded} if 'l' in setup: for ny in range(li+1, ri+1): tmpo = psi.transfer_mpo(n=ny-1, dirn='v').H phi0 = self._env['l', ny-1] self._env['l', ny], discarded = mps.zipper(tmpo, phi0, opts_svd, return_discarded=True) mps.compression_(self._env['l', ny], (tmpo, phi0), **opts_var) self.info['l', ny] = {'discarded': discarded} if 't' in setup: for nx in range(ti+1, bi+1): tmpo = psi.transfer_mpo(n=nx-1, dirn='h') phi0 = self._env['t', nx-1] self._env['t', nx], discarded = mps.zipper(tmpo, phi0, opts_svd, return_discarded=True) mps.compression_(self._env['t', nx], (tmpo, phi0), **opts_var) self.info['t', nx] = {'discarded': discarded} if 'b' in setup: for nx in range(bi-1, ti-1, -1): tmpo = psi.transfer_mpo(n=nx+1, dirn='h').H phi0 = self._env['b', nx+1] self._env['b', nx], discarded = mps.zipper(tmpo, phi0, opts_svd, return_discarded=True) mps.compression_(self._env['b', nx], (tmpo, phi0), **opts_var) self.info['b', nx] = {'discarded': discarded} def boundary_mps(self, n, dirn): return self._env[dirn, n]
[docs] def measure_1site(peps_env, op): """ Calculate all 1-point expectation values <o> in a finite peps. Takes CTM environments and operators. o1 are given as dict[tuple[int, int], dict[int, operators]], mapping sites with list of operators at each site. """ out = {} psi = peps_env.psi Nx, Ny = psi.Nx, psi.Ny sites = [(nx, ny) for ny in range(Ny-1, -1, -1) for nx in range(Nx)] opdict = _clear_operator_input(op, sites) for ny in range(Ny-1, -1, -1): vR = peps_env.boundary_mps(n=ny, dirn='r') vL = peps_env.boundary_mps(n=ny, dirn='l') Os = psi.transfer_mpo(n=ny, dirn='v') env = mps.Env(vL, [Os, vR]).setup_(to='first').setup_(to='last') norm_env = env.measure() for nx in range(Nx): if (nx, ny) in opdict: for nz, o in opdict[nx, ny].items(): Os[nx].set_operator_(o) env.update_env_(nx, to='first') out[(nx, ny) + nz] = env.measure(bd=(nx-1, nx)) / norm_env return out
[docs] def measure_2site(peps_env, op1, op2, opts_svd, opts_var=None): """ Calculate all 2-point correlations <op1 op2> in a finite peps. Takes CTM environments and operators. o1 and o2 are given as dict[tuple[int, int], dict[int, operators]], mapping sites with list of operators at each site. """ out = {} if opts_var is None: opts_var = {'max_sweeps': 2} psi = peps_env.psi Nx, Ny = psi.Nx, psi.Ny sites = [(nx, ny) for ny in range(Ny-1, -1, -1) for nx in range(Nx)] op1dict = _clear_operator_input(op1, sites) op2dict = _clear_operator_input(op2, sites) for nx1, ny1 in sites: # print( f"Correlations from {nx1} {ny1} ... ") for nz1, o1 in op1dict[nx1, ny1].items(): vR = peps_env.boundary_mps(n=ny1, dirn='r') vL = peps_env.boundary_mps(n=ny1, dirn='l') Os = psi.transfer_mpo(n=ny1, dirn='v') env = mps.Env(vL, [Os, vR]).setup_(to='first').setup_(to='last') norm_env = env.measure(bd=(-1, 0)) if ny1 > 0: vRnext = mps.zipper(Os, vR, opts_svd=opts_svd) mps.compression_(vRnext, (Os, vR), method='1site', normalize=False, **opts_var) # first calculate on-site correlations for nz2, o2 in op2dict[nx1, ny1].items(): Os[nx1].set_operator_(o1 @ o2) env.update_env_(nx1, to='last') out[(nx1, ny1) + nz1, (nx1, ny1) + nz2] = env.measure(bd=(nx1, nx1+1)) / norm_env Os[nx1].set_operator_(o1) env.setup_(to='last') if ny1 > 0: vRo1next = mps.zipper(Os, vR, opts_svd=opts_svd) mps.compression_(vRo1next, (Os, vR), method='1site', normalize=False, **opts_var) # calculate correlations along the row for nx2 in range(nx1 + 1, Nx): for nz2, o2 in op2dict[nx2, ny1].items(): Os[nx2].set_operator_(o2) env.update_env_(nx2, to='first') out[(nx1, ny1) + nz1, (nx2, ny1) + nz2] = env.measure(bd=(nx2-1, nx2)) / norm_env # and all subsequent rows for ny2 in range(ny1-1, -1, -1): vR = vRnext vRo1 = vRo1next vL = peps_env.boundary_mps(n=ny2, dirn='l') Os = psi.transfer_mpo(n=ny2, dirn='v') env = mps.Env(vL, [Os, vR]).setup_(to='first') norm_env = env.measure(bd=(-1, 0)) if ny2 > 0: vRnext = mps.zipper(Os, vR, opts_svd=opts_svd) mps.compression_(vRnext, (Os, vR), method='1site', normalize=False, **opts_var) vRo1next = mps.zipper(Os, vRo1, opts_svd=opts_svd) mps.compression_(vRo1next, (Os, vRo1), method='1site', normalize=False, **opts_var) env = mps.Env(vL, [Os, vRo1]).setup_(to='first').setup_(to='last') for nx2 in range(psi.Nx): for nz2, o2 in op2dict[nx2, ny2].items(): Os[nx2].set_operator_(o2) env.update_env_(nx2, to='first') out[(nx1, ny1) + nz1, (nx2, ny2) + nz2] = env.measure(bd=(nx2-1, nx2)) / norm_env return out
[docs] def sample(peps_env, projectors, opts_svd=None, opts_var=None): """ Sample a random configuration from a finite PEPS. Takes CTM environments and a complete list of projectors to sample from. """ psi = peps_env.psi config = psi[0, 0].config rands = (config.backend.rand(psi.Nx * psi.Ny) + 1) / 2 out = {} count = 0 vR = peps_env.boundary_mps(n=psi.Ny-1, dirn='r') # right boundary of indexed column through CTM environment tensors for ny in range(psi.Ny - 1, -1, -1): Os = psi.transfer_mpo(n=ny, dirn='v') # converts ny colum of PEPS to MPO vL = peps_env.boundary_mps(n=ny, dirn='l') # left boundary of indexed column through CTM environment tensors env = mps.Env(vL, [Os, vR]).setup_(to = 'first') for nx in range(0, psi.Nx): loc_projectors = projectors[nx, ny] prob = [] norm_prob = env.measure(bd=(nx - 1, nx)) for proj in loc_projectors: Os[nx].set_operator_(proj) env.update_env_(nx, to='last') prob.append(env.measure(bd=(nx, nx+1)) / norm_prob) assert abs(sum(prob) - 1) < 1e-12 rand = rands[count] ind = sum(apr < rand for apr in accumulate(prob)) out[nx, ny] = ind Os[nx].set_operator_(loc_projectors[ind]) # updated with the new collapse env.update_env_(nx, to='last') count += 1 if opts_svd is None: opts_svd = {'D_total': max(vL.get_bond_dimensions())} vRnew = mps.zipper(Os, vR, opts_svd=opts_svd) if opts_var is None: opts_var = {} mps.compression_(vRnew, (Os, vR), method='1site', **opts_var) vR = vRnew return out
[docs] def sample_MC_(proj_env, st0, st1, st2, psi, projectors, opts_svd, opts_var, trial="local"): """ Monte Carlo steps in a finite peps. Makes two steps while sweeping finite lattice back and forth. Takes environments and a complete list of projectors to sample from. proj_env, st1, st2 are updated in place """ if trial == "local": _sample_MC_column = _sample_MC_column_local elif trial == "uniform": _sample_MC_column = _sample_MC_column_uniform else: raise YastnError(f"{trial=} not supported.") Nx, Ny = psi.Nx, psi.Ny config = psi[0, 0].config # pre-draw uniformly distributed random numbers as iterator; rands = iter((config.backend.rand(2 * Nx * Ny) + 1) / 2) # in [0, 1] # sweep though the lattice accept = 0 for ny in range(Ny-1, -1, -1): vR, Os, _, astep = _sample_MC_column(ny, proj_env, st0, st1, psi, projectors, rands) accept += astep if ny > 0: vRnew = mps.zipper(Os, vR, opts_svd=opts_svd) mps.compression_(vRnew, (Os, vR), method='1site', **opts_var) proj_env._env['r', ny-1] = vRnew proj_env._env.pop(('l', ny)) for ny in range(Ny): _, Os, vL, astep = _sample_MC_column(ny, proj_env, st1, st2, psi, projectors, rands) accept += astep if ny < Ny - 1: OsT = Os.H vLnew = mps.zipper(OsT, vL, opts_svd=opts_svd) mps.compression_(vLnew, (OsT, vL), method='1site', **opts_var) proj_env._env['l', ny+1] = vLnew proj_env._env.pop(('r', ny)) return accept / (2 * Nx * Ny) # acceptance rate
def _clear_operator_input(op, sites): op_dict = op.copy() if isinstance(op, dict) else {site: op for site in sites} for k, v in op_dict.items(): if isinstance(v, dict): op_dict[k] = {(i,): vi for i, vi in v.items()} elif isinstance(v, Tensor): op_dict[k] = {(): v} else: # is iterable op_dict[k] = {(i,): vi for i, vi in enumerate(v)} return op_dict def _sample_MC_column_local(ny, proj_env, st0, st1, psi, projectors, rands): # update is proposed based on local probabilies vR = proj_env.boundary_mps(n=ny, dirn='r') Os = proj_env.psi.transfer_mpo(n=ny, dirn='v') vL = proj_env.boundary_mps(n=ny, dirn='l') env = mps.Env(vL, [Os, vR]).setup_(to='first') for nx in range(psi.Nx): amp = env.hole(nx).tensordot(psi[nx, ny], axes=((0, 1), (0, 1))) prob = [abs(amp.vdot(pr, conj=(0, 0))) ** 2 for pr in projectors[nx, ny]] sumprob = sum(prob) prob = [x / sumprob for x in prob] rand = next(rands) ind = sum(x < rand for x in accumulate(prob)) st1[nx, ny] = ind proj_env.psi[nx, ny] = (psi[nx, ny] @ projectors[nx, ny][ind]) Os[nx] = proj_env.psi[nx, ny] env.update_env_(nx, to='last') accept = psi.Nx return vR, Os, vL, accept def _sample_MC_column_uniform(ny, proj_env, st0, st1, psi, projectors, rands): # update is proposed from uniform local distribution config = proj_env.psi[0, 0].config accept = 0 vR = proj_env.boundary_mps(n=ny, dirn='r') Os = proj_env.psi.transfer_mpo(n=ny, dirn='v') vL = proj_env.boundary_mps(n=ny, dirn='l') env = mps.Env(vL, [Os, vR]).setup_(to='first') for nx in range(psi.Nx): A = psi[nx, ny] ind0 = st0[nx, ny] ind1 = config.backend.randint(0, len(projectors[nx, ny])) pr0 = projectors[nx, ny][ind0] pr1 = projectors[nx, ny][ind1] A0 = (A @ pr0).unfuse_legs(axes=(0, 1)) A1 = (A @ pr1).unfuse_legs(axes=(0, 1)) Os[nx] = A1 env.update_env_(nx, to='last') prob_new = abs(env.measure(bd=(nx, nx+1))) ** 2 Os[nx] = A0 env.update_env_(nx, to='last') prob_old = abs(env.measure(bd=(nx, nx+1))) ** 2 if next(rands) < prob_new / prob_old: # accept accept += 1 st1[nx, ny] = ind1 proj_env.psi[nx, ny] = A1 Os[nx] = A1 env.update_env_(nx, to='last') else: # reject st1[nx, ny] = ind0 return vR, Os, vL, accept