iclientpy.codingui.distributedanalyst 源代码

from ._aggregate import attach as aggregate_attach, new_executor as new_aggregate_executor
from ._datacatalog import get_datas
from iclientpy.rest.api.datacatalog import Datacatalog
from iclientpy.rest.api.distributedanalyst import DistributedAnalyst
from iclientpy.rest.api.restmap import MapService
from typing import Callable
from functools import partial


[文档]def get_datas_with_optionms(distributedanalyst: DistributedAnalyst, datacatalog: Datacatalog, map_service_fun: Callable[[str], MapService]): aggregate_executor = new_aggregate_executor(distributedanalyst,map_service_fun) return get_datas(datacatalog, [partial(aggregate_attach, aggregate_executor)])
# class Dataset: # name: str # # def get_datasets(profile_name:str = None, datacatalog_service_name:str = 'datacatalog/rest', distributedanalyst_service_name:str = 'distributedanalyst/rest') -> List[Dataset]: # # # profile = env.get_profile(profile_name) #type: env.Profile # api_factory = env.create_apifactory_from_profile(profile) # datacatalog = api_factory.datacatalog_service(datacatalog_service_name) # distributedanalyst = api_factory.distributedanalyst_service(distributedanalyst_service_name) # # class Job: # _id: str # _detail_method: Callable # def __init__(self, id:str, detail_method:Callable): # self._id = id # self._detail_method = detail_method # self._widget = SparkJobStateWidgets() # # # def _ipython_display_(self, **kwargs): # self._widget._ipython_display_(**kwargs) # job = self._detail_method(self._id)#type:GetDensityResultItem # self._widget.update(job.state) # while not job.state.endState: # time.sleep(3) # job = self._detail_method(self._id)#type:GetDensityResultItem # self._widget.update(job.state) # for service_info in job.setting.serviceInfo.targetServiceInfos: # if service_info.serviceType == TargetSericeType.RESTMAP: # service_name = service_info.serviceAddress[ # service_info.serviceAddress.rfind('/services/') + len('/services/'):] # map_service = api_factory.map_service(service_name) # map_name = 'kernelDensity_RecordCount_Density_Map' # bounds = map_service.get_map(map_name).viewBounds # default_tiles = icp.TileMapLayer( # url=service_info.serviceAddress + '/maps/' + map_name) # map = icp.MapView(default_tiles=default_tiles, crs='EPSG4326', # fit_bounds=[[40, -74.5], [41, -73.5]]) # map._ipython_display_(**kwargs) # # # class CSVDataset(Dataset): # def density(self, resolution:int, radius:int, mesh_size_unit:DistanceUnit, radius_unit:DistanceUnit, area_unit:AreaUnit, mesh_type:int, method:int): # postentity = PostDensityEntity() # postentity.input = DatasetInputSetting() # postentity.input.datasetName = self.name # postentity.analyst = KernelDensityAnalystSetting() # postentity.analyst.resolution = resolution # postentity.analyst.radius = radius # postentity.analyst.meshSizeUnit = mesh_size_unit # postentity.analyst.radiusUnit = radius_unit # postentity.analyst.areaUnit = area_unit # postentity.analyst.meshType = mesh_type # postentity.analyst.method = method # post_result = distributedanalyst.post_density(postentity) # if not post_result.succeed: # raise Exception(post_result.error.errorMsg) # return Job(post_result.newResourceID, distributedanalyst.get_density_job) # # def _convert_sharefile(remote_dataset:BigDataFileShareDataSetInfo): # result = CSVDataset() # result.name = remote_dataset.name # return result # # # def _list_remote_dataset(result:List[Dataset], list_method:Callable[[], DatasetsContent], detail_method:Callable, convert_fun: Callable): # contents = list_method() #type:DatasetsContent # for name in contents.datasetNames: # result.append(convert_fun(detail_method(name))) # result = [] #type: List[Dataset] # _list_remote_dataset(result, datacatalog.get_sharefile, datacatalog.get_sharefile_dataset,_convert_sharefile) # return result