iclientpy.rest.api.cache 源代码

import re
import time
import uuid
import os
from typing import List
import progressbar
from iclientpy.rest.api.model import Rectangle2D, Point2D, ServiceType, TileSize, OutputFormat, PostWorkspaceParameter, \
    PostTileJobsItem, BuildState, PostTilesetUpdateJobs, SMTilesTileSourceInfo, TilesetExportJobRunState, TileType, \
    TileSourceInfo
from iclientpy.rest.api.management import Management
from iclientpy.rest.api.model import PostFileUploadTasksParam, FileUploadState
from iclientpy.rest.apifactory import APIFactory
from .cacheutils import provider_setting_to_tile_source_info
from iclientpy.dtojson import to_json_str
import zipfile
from io import BufferedIOBase


[文档]def output(tip: str): print(tip)
field_and_desc = { 'address': '地址', 'username': '用户名', 'password': '密码', 'token': 'token', 'component_name': '服务名称', 'w_loc': '工作空间路径', 'map_name': '切图地图名称', 'original_point': '切图原点', 'cache_bounds': '缓存范围', 'scale': '缓存比例尺分母', 'w_servicetype': '工作空间服务类型', 'tile_size': '切片大小', 'tile_type': '切片类型', 'format': '切片输出格式', 'epsg_code': '投影', 'output': '输出路径' }
[文档]def confirm(**d): output('执行参数如下:') for field in field_and_desc.keys(): if field in d: val = '' if d[field] is None: val = '' elif isinstance(d[field], tuple): val = ','.join(map(str, d[field])) elif isinstance(d[field], bool): val = str(d[field]) elif not isinstance(d[field], str): val = to_json_str(d[field]) else: val = d[field] output(field_and_desc[field] + ": " + val) print('是否继续?(Y/N)', end=' ') return input('')
[文档]def cache_service(address: str, username: str, password: str, component_name: str, map_name: str, original_point: tuple, cache_bounds: tuple, scale: List[float] = None, tile_size: TileSize = TileSize.SIZE_256, tile_type: TileType = TileType.Image, format: OutputFormat = OutputFormat.PNG, epsg_code: int = -1, storageid: str = None, storageconfig: TileSourceInfo = None, token: str = None, quiet: bool = False, job_tile_source_type: str = 'SMTiles', output: str = None): if len(original_point) is not 2: raise Exception("切图原点坐标长度错误") tem_original_point = Point2D() tem_original_point.x = original_point[0] tem_original_point.y = original_point[1] if len(cache_bounds) is not 4: raise Exception("切图范围长度错误") tem_cache_Bounds = Rectangle2D() tem_cache_Bounds.leftBottom = Point2D() tem_cache_Bounds.leftBottom.x = cache_bounds[0] tem_cache_Bounds.leftBottom.y = cache_bounds[1] tem_cache_Bounds.rightTop = Point2D() tem_cache_Bounds.rightTop.x = cache_bounds[2] tem_cache_Bounds.rightTop.y = cache_bounds[3] api = APIFactory(address, username, password, token) mng = api.management() if scale is None or len(scale) is 0: raise Exception('未指定比例尺') if storageid is not None: storage_info = mng.get_datastore(storageid) for info in storage_info.tilesetInfos: if info.metaData.mapName == map_name: storageconfig = storage_info.tileSourceInfo if storageconfig is None: storageconfig = SMTilesTileSourceInfo() storageconfig.type = job_tile_source_type storageconfig.outputPath = output if output is not None else '../webapps/iserver/output/icpy_' + uuid.uuid1().__str__() if not quiet: confirmResult = confirm(address=address, username=username, password=password, component_name=component_name, map_name=map_name, original_point=original_point, cache_bounds=cache_bounds, scale=scale, tile_size=tile_size, tile_type=tile_type, format=format, epsg_code=epsg_code, storageid=storageid, token=token) if confirmResult.lower() == 'n': return post_tile_jobs_param = PostTileJobsItem() post_tile_jobs_param.dataConnectionString = component_name post_tile_jobs_param.mapName = map_name post_tile_jobs_param.scaleDenominators = scale post_tile_jobs_param.tileSize = tile_size post_tile_jobs_param.tileType = tile_type post_tile_jobs_param.format = format post_tile_jobs_param.epsgCode = epsg_code post_tile_jobs_param.storeConfig = storageconfig post_tile_jobs_param.originalPoint = tem_original_point post_tile_jobs_param.cacheBounds = tem_cache_Bounds ptjr = mng.post_tilejobs(post_tile_jobs_param) jobstate = mng.get_tilejob(ptjr.newResourceID).state bar = _process_bar('切图', _PercentageCounter(), jobstate.total) while (jobstate.runState is BuildState.BUILDING): time.sleep(5) jobstate = mng.get_tilejob(ptjr.newResourceID).state bar.update(jobstate.completed) gjr = mng.get_tilejob(ptjr.newResourceID) if (gjr.state.runState is not BuildState.COMPLETED): raise Exception('切图失败') if output is not None: print("切图结果存储路径为:" + output)
[文档]def cache_workspace(address: str, username: str, password: str, w_loc: str, map_name: str, original_point: tuple, cache_bounds: tuple, scale: List[float] = None, w_servicetypes: List[ServiceType] = [ServiceType.RESTMAP], tile_size: TileSize = TileSize.SIZE_256, tile_type: TileType = TileType.Image, format: OutputFormat = OutputFormat.PNG, epsg_code: int = -1, storageid: str = None, storageconfig: TileSourceInfo = None, token: str = None, quiet: bool = False, job_tile_source_type: str = 'SMTiles', output: str = None, remote_workspace: bool = False): if len(original_point) is not 2: raise Exception("切图原点坐标长度错误") tem_original_point = Point2D() tem_original_point.x = original_point[0] tem_original_point.y = original_point[1] if len(cache_bounds) is not 4: raise Exception("切图范围长度错误") tem_cache_Bounds = Rectangle2D() tem_cache_Bounds.leftBottom = Point2D() tem_cache_Bounds.leftBottom.x = cache_bounds[0] tem_cache_Bounds.leftBottom.y = cache_bounds[1] tem_cache_Bounds.rightTop = Point2D() tem_cache_Bounds.rightTop.x = cache_bounds[2] tem_cache_Bounds.rightTop.y = cache_bounds[3] api = APIFactory(address, username, password, token) mng = api.management() if scale is None or len(scale) is 0: raise Exception('未指定比例尺') if not quiet: confirmResult = confirm(address=address, username=username, password=password, w_loc=w_loc, map_name=map_name, original_point=original_point, cache_bounds=cache_bounds, scale=scale, w_servicetype=w_servicetypes, tile_size=tile_size, tile_type=tile_type, format=format, epsg_code=epsg_code, storageid=storageid, token=token, output=output) if confirmResult.lower() == 'n': return if remote_workspace: remote_workspace_file_full_path = w_loc else: param = PostFileUploadTasksParam() pfutsr = mng.post_fileuploadtasks(param) remote_workspace_file_full_path = _upload_workspace_file(mng, w_loc, pfutsr.newResourceID) gfutr = mng.get_fileuploadtask(pfutsr.newResourceID) if gfutr.state is not FileUploadState.COMPLETED: raise Exception('文件上传失败') print("工作空间上传路径为:" + remote_workspace_file_full_path) post_param = PostWorkspaceParameter() post_param.workspaceConnectionInfo = remote_workspace_file_full_path post_param.servicesTypes = w_servicetypes pwr = mng.post_workspaces(post_param) wkn = re.findall('services/[^/]*', pwr[0].serviceAddress)[0].lstrip('services/') cache_service(address=address, username=username, password=password, component_name=wkn, map_name=map_name, original_point=original_point, cache_bounds=cache_bounds, scale=scale, tile_size=tile_size, tile_type=tile_type, format=format, epsg_code=epsg_code, storageid=storageid, storageconfig=storageconfig, token=token, quiet=True, job_tile_source_type=job_tile_source_type, output=output)
def _get_tile_source_info_from_service(mng: Management, name: str) -> TileSourceInfo: service_info = mng.get_service(name) return provider_setting_to_tile_source_info(service_info.providers[0].spSetting.config) def _custom_len(value): total = 0 for c in value: if c >= u'\u4e00' and c <= u'\u9fa5': # 中文字符宽度为2 total += 2 else: total += 1 return total def _process_bar(name: str, counter: progressbar.Counter, max_value: int) -> progressbar.ProgressBar: return progressbar.ProgressBar( widgets=[ name + '进度: ', progressbar.Bar(), ' ', counter, ], len_func=_custom_len, max_value=max_value ) class _PercentageCounter(progressbar.Counter): def __call__(self, progress, data, format=None): # processbar似乎计算宽度有问题导致换行,给多加几个空格让希望显示的内容不换行。 return '{0:5}% '.format(round(data['value'] / data['max_value'] * 100, 2)) class _DisableSeekAndTellIOWrapper(BufferedIOBase): """ zipfile压缩文件的时候会通过seek和tell来定位写入zip文件头信息。 但是管道流不能正常的seek和tell,故通过这个wrapper来禁用seek和tell,从而强迫zipfile使用流式压缩 """ def __init__(self, towrap): self._wrapped = towrap def seek(self, *args, **kwargs): raise AttributeError() def tell(self, *args, **kwargs): raise AttributeError() def seekable(self, *args, **kwargs): return False def __getattribute__(self, item): if item in ('_wrapped', 'seek', 'seekable', 'tell'): return object.__getattribute__(self, item) return self._wrapped.__getattribute__(item) def __enter__(self): self._wrapped.__enter__() return self def __exit__(self, *args, **kwargs): self._wrapped.__exit__(*args, **kwargs) def _zip_files_in_workspace_directory(w_loc: str) -> BufferedIOBase: rfd, wfd = os.pipe() def zip_in_thread(wfd): dirname = os.path.dirname(w_loc) with os.fdopen(wfd, 'wb') as outputfile: with zipfile.ZipFile(_DisableSeekAndTellIOWrapper(outputfile), 'w') as zfile: for root, dirs, files in os.walk(dirname): for name in files: fullpath = os.path.join(root, name) arcname = os.path.relpath(fullpath, dirname) zfile.write(fullpath, arcname=arcname) import threading threading.Thread(target=zip_in_thread, args=(wfd,), name='zip ' + w_loc).start() return os.fdopen(rfd, 'rb') def _upload_workspace_file(mng: Management, w_loc: str, upload_task_id: str) -> str: r_post = None # type PostFileUploadTaskResult if w_loc.lower().endswith('.zip'): with zipfile.ZipFile(w_loc) as zipf: zipfns = zipf.namelist() workspace_file_name = [item for item in zipfns if item.endswith('.sxwu')][0] with open(w_loc, 'rb') as wf: r_post = mng.post_fileuploadtask(upload_task_id, wf, './' + os.path.basename(w_loc), overwrite=True, unzip=True) else: workspace_file_name = os.path.basename(w_loc) with _zip_files_in_workspace_directory(w_loc) as wf: r_post = mng.post_fileuploadtask(upload_task_id, wf, './' + workspace_file_name.split('.')[0] + '.zip', overwrite=True, unzip=True) return r_post.filePath + './' + workspace_file_name