# coding: utf-8
from urllib.parse import urljoin
import os
import json
import logging
from datetime import datetime, timedelta
from xml.etree.ElementTree import XML
from xml.parsers.expat import ExpatError
import requests
from geoserver import settings
from geoserver.resource import FeatureType, Coverage
from geoserver.support import prepare_upload_bundle, _decode_dict
from geoserver.workspace import workspace_from_index, Workspace
from geoserver.store import coveragestore_from_index, \
wmsstore_from_index, datastore_from_index, UnsavedDataStore, \
UnsavedCoverageStore, UnsavedWmsStore
from geoserver.layer import Layer
from geoserver.layergroup import LayerGroup, UnsavedLayerGroup
from geoserver.style import Style
LOGGER = logging.getLogger("gsconfig2.catalog")
class Catalog:
"""
The GeoServer catalog represents all of the information in the GeoServer
configuration. This includes:
- Stores of geospatial data
- Resources, or individual coherent datasets within stores
- Styles for resources
- Layers, which combine styles with resources to create a visible map layer
- LayerGroups, which alias one or more layers for convenience
- Workspaces, which provide logical grouping of Stores
- Maps, which provide a set of OWS services with a subset of the server's
Layers
- Namespaces, which provide unique identifiers for resources
"""
def __init__(self, service_url,
username=settings.DEFAULT_USERNAME,
password=settings.DEFAULT_PASSWORD,
disable_ssl_certificate_validation=False):
self._service_url = service_url
self._username = username
self._password = password
self._disable_ssl_validation = disable_ssl_certificate_validation
self._cache = dict()
self._version = None
self._session = requests.Session()
self._session.auth = (self.username, self.password)
@property
def service_url(self):
return self._service_url
@property
def username(self):
return self._username
@property
def password(self):
return self._password
@property
def disable_ssl_validation(self):
return self._disable_ssl_validation
@property
def session(self):
return self._session
@property
def version(self):
return self._version
def about(self):
"""
:return: About informations of the geoserver as a formatted html.
"""
about_url = urljoin(self.service_url,
"about/version.xml")
r = self.session.get(about_url)
if r.status_code == requests.codes.ok:
return r.text
raise FailedRequestError("Unable to determine version: {}"
.format(r.text or r.status_code))
def gsversion(self):
"""
:return: The geoserver version.
"""
if self.version:
return self.version
else:
about_text = self.about()
dom = XML(about_text)
resources = dom.findall("resource")
version = None
for resource in resources:
if resource.attrib["name"] == "GeoServer":
try:
version = resource.find("Version").text
break
except AttributeError:
pass
if version is None:
version = "<2.3.x"
self._version = version
return version
# TODO: Test
def delete(self, config_object, purge=None, recurse=False):
"""
send a delete request.
:param recurse: True if underlying objects must be deleted recursively.
:param purge:
"""
rest_url = config_object.href
params = {
'purge': purge,
'recurse': recurse,
}
headers = {
"Content-type": "application/xml",
"Accept": "application/xml"
}
r = self.session.delete(rest_url, params=params, headers=headers)
if r.status_code == requests.codes.ok:
return r.text
else:
msg = "Tried to make a DELETE request to "\
+ "{} but got a {} status code: \n{}"\
.format(rest_url, r.status_code, r.text)
raise FailedRequestError(msg)
def get_xml(self, rest_url):
LOGGER.debug("GET {}".format(rest_url))
cached_response = self._cache.get(rest_url)
if cached_response is not None:
last_cache = datetime.now() - cached_response[0]
text = cached_response[1]
cache_expire = timedelta(seconds=5)
if cached_response is None or last_cache > cache_expire:
r = self.session.get(rest_url)
text = r.text
if r.status_code == requests.codes.ok:
self._cache[rest_url] = (datetime.now(), text)
else:
msg = "Tried to make a GET request to {}"\
+ " but got a {} status code: \n{}"
raise FailedRequestError(msg.format(
rest_url,
r.status_code,
text
))
try:
return XML(text)
except (ExpatError, SyntaxError) as e:
msg = "GeoServer gave non-XML response for [GET {}]: {}"\
.format(rest_url, text)
raise Exception(msg, e)
def reload(self):
"""
Send a reload request to the GeoServer and clear the cache.
:return: The response given by the GeoServer.
"""
reload_url = urljoin(self.service_url, "reload")
r = self.session.post(reload_url)
self._cache.clear()
return r
def reset(self):
"""
Send a reset request to the GeoServer and clear the cache.
:return: The response given by the server.
"""
reset_url = urljoin(self.service_url, "reset")
r = self.session.post(reset_url)
self._cache.clear()
return r
def save(self, obj, content_type="application/xml"):
"""
saves an object to the REST service
gets the object's REST location and the data from the object,
then POSTS the request.
:param obj: The object to save.
:return: The response given by the server.
"""
rest_url = obj.href
message = obj.message()
save_method = obj.save_method
LOGGER.debug("{} {}".format(save_method, rest_url))
methods = {
settings.POST: self.session.post,
settings.PUT: self.session.put
}
headers = {
"Content-type": content_type,
"Accept": content_type
}
r = methods[save_method](rest_url, data=message, headers=headers)
self._cache.clear()
if 400 <= r.status_code < 600:
raise FailedRequestError(
"Error code ({}) from GeoServer: {}"\
.format(r.status_code, r.text)
)
return r
def get_store(self, name, workspace=None):
# Make sure workspace is a workspace object and not a string.
# If the workspace does not exist,
# continue as if no workspace had been defined.
if isinstance(workspace, str):
workspace = self.get_workspace(workspace)
# Create a list with potential workspaces to look into
# if a workspace is defined, it will contain only that workspace
# if no workspace is defined, the list will contain all workspaces.
workspaces = []
if workspace is None:
workspaces.extend(self.get_workspaces())
else:
workspaces.append(workspace)
# Iterate over all workspaces to find the stores or store
found_stores = {}
for ws in workspaces:
# Get all the store objects from geoserver
raw_stores = self.get_stores(workspace=ws)
# And put it in a dictionary where the keys are
# the name of the store,
new_stores = dict(zip([s.name for s in raw_stores], raw_stores))
# If the store is found, put it in a dict that
# also takes into account the worspace.
if name in new_stores:
found_stores[ws.name + ':' + name] = new_stores[name]
# There are 3 cases:
# a) No stores are found.
# b) Only one store is found.
# c) More than one is found.
if len(found_stores) == 0:
raise FailedRequestError("No store found named: {}".format(name))
elif len(found_stores) > 1:
msg = "Multiple stores found named '{}': {}"
raise AmbiguousRequestError(msg.format(name, found_stores.keys()))
else:
return list(found_stores.values())[0]
def get_stores(self, workspace=None):
if workspace is not None:
if isinstance(workspace, str):
workspace = self.get_workspace(workspace)
ds_list = self.get_xml(workspace.datastore_url)
cs_list = self.get_xml(workspace.coveragestore_url)
wms_list = self.get_xml(workspace.wmsstore_url)
datastores = [datastore_from_index(self, workspace, n)
for n in ds_list.findall("dataStore")]
coveragestores = [coveragestore_from_index(self, workspace, n)
for n in cs_list.findall("coverageStore")]
wmsstores = [wmsstore_from_index(self, workspace, n)
for n in wms_list.findall("wmsStore")]
return datastores + coveragestores + wmsstores
else:
stores = []
for ws in self.get_workspaces():
a = self.get_stores(ws)
stores.extend(a)
return stores
def create_datastore(self, name, workspace=None):
if isinstance(workspace, str):
workspace = self.get_workspace(workspace)
elif workspace is None:
workspace = self.get_default_workspace()
return UnsavedDataStore(self, name, workspace)
def create_coveragestore2(self, name, workspace=None):
"""
Hm we already named the method that creates a coverage *resource*
create_coveragestore... time for an API break?
"""
if isinstance(workspace, str):
workspace = self.get_workspace(workspace)
elif workspace is None:
workspace = self.get_default_workspace()
return UnsavedCoverageStore(self, name, workspace)
def create_wmsstore(self, name, workspace=None):
if workspace is None:
workspace = self.get_default_workspace()
return UnsavedWmsStore(self, name, workspace)
def create_wmslayer(self, workspace, store, name, nativeName=None):
# if not provided, fallback to name - this is what geoserver will do
# anyway but nativeName needs to be provided if name is invalid xml
# as this will cause verification errors since geoserver 2.6.1
if nativeName is None:
nativeName = name
wms_url = store.href.replace('.xml', '/wmslayers')
d = "{}{}"
data = d.format(name, nativeName)
headers = {
"Content-type": "text/xml",
"Accept": "application/xml"
}
r = self.session.post(wms_url, data=data, headers=headers)
self._cache.clear()
status_code = r.status_code
if status_code < 200 or status_code > 299:
raise UploadError(r.text)
return self.get_resource(name, store=store, workspace=workspace)
def add_data_to_store(self, store, name, data, workspace=None,
overwrite=False, charset=None):
"""
Add shapefile data to store.
"""
if isinstance(store, str):
store = self.get_store(store, workspace=workspace)
if workspace is not None:
workspace = _name(workspace)
msg = "Specified store ({}) is not in specified workspace ({})!"
msg = msg.format(store, workspace)
assert store.workspace.name == workspace, msg
else:
workspace = store.workspace.name
store = store.name
if isinstance(data, dict):
bundle = prepare_upload_bundle(name, data)
else:
bundle = data
params = dict()
if overwrite:
params["update"] = "overwrite"
if charset is not None:
params["charset"] = charset
headers = {
'Content-Type': 'application/zip',
'Accept': 'application/xml'
}
upload_url = urljoin(
self.service_url,
"workspaces/{}/datastores/{}/file.shp".format(
workspace,
store
)
)
try:
with open(bundle, "rb") as f:
data = f.read()
r = self.session.put(upload_url, params=params, data=data,
headers=headers)
self._cache.clear()
if r.status_code != 201:
raise UploadError(r.text)
finally:
os.unlink(bundle)
def create_featurestore(self, name, data, workspace=None, overwrite=False,
charset=None):
"""
Create a shapefile datastore from a shapefile.
"""
if not overwrite:
try:
self.get_store(name, workspace)
msg = "There is already a store named " + name
if workspace:
msg += " in " + str(workspace)
raise ConflictingDataError(msg)
except FailedRequestError:
# we don't really expect that every layer name will be taken
pass
if workspace is None:
workspace = self.get_default_workspace()
workspace = _name(workspace)
params = dict()
if charset is not None:
params['charset'] = charset
ds_url = urljoin(
self.service_url,
"workspaces/{}/datastores/{}/file.shp".format(
workspace, name
)
)
# PUT /workspaces//datastores//file.shp
headers = {
"Content-type": "application/zip",
"Accept": "application/xml"
}
if isinstance(data,dict):
LOGGER.debug('Data is NOT a zipfile')
archive = prepare_upload_bundle(name, data)
else:
LOGGER.debug('Data is a zipfile')
archive = data
message = open(archive, 'rb')
try:
r = self.session.put(ds_url, data=message, headers=headers,
params=params)
self._cache.clear()
if r.status_code != 201:
raise UploadError(r.text)
finally:
message.close()
os.unlink(archive)
def create_imagemosaic(self, name, data, configure=None, workspace=None,
overwrite=False, charset=None):
if not overwrite:
try:
self.get_store(name, workspace)
msg = "There is already a store named " + name
if workspace:
msg += " in " + str(workspace)
raise ConflictingDataError(msg)
except FailedRequestError:
# we don't really expect that every layer name will be taken
pass
if workspace is None:
workspace = self.get_default_workspace()
workspace = _name(workspace)
params = dict()
if charset is not None:
params['charset'] = charset
if configure is not None:
params['configure'] = "none"
cs_url = urljoin(
self.service_url,
"workspaces/{}/coveragestores/{}/file.imagemosaic".format(
workspace, name
)
)
# PUT /workspaces//coveragestores//file.imagemosaic?configure=none
headers = {
"Content-type": "application/zip",
"Accept": "application/xml"
}
if isinstance(data, str):
message = open(data, 'rb')
else:
message = data
try:
r = self.session.put(cs_url, data=message, headers=headers,
params=params)
self._cache.clear()
if r.status_code != 201:
raise UploadError(r.text)
finally:
if hasattr(message, "close"):
message.close()
def create_coveragestore(self, name, data, workspace=None,
overwrite=False):
self._create_coveragestore(name, data, workspace, overwrite)
def create_coveragestore_external_geotiff(self, name, data,workspace=None,
overwrite=False):
self._create_coveragestore(name, data, workspace=workspace,
overwrite=overwrite, external=True)
def _create_coveragestore(self, name, data, workspace=None,
overwrite=False, external=False):
if not overwrite:
try:
store = self.get_store(name, workspace)
msg = "There is already a store named " + name
if workspace:
msg += " in " + str(workspace)
raise ConflictingDataError(msg)
except FailedRequestError:
# we don't really expect that every layer name will be taken
pass
if workspace is None:
workspace = self.get_default_workspace()
archive = None
ext = "geotiff"
content_type = "image/tiff" if not external else "text/plain"
store_type = "file." if not external else "external."
headers = {
"Content-type": content_type,
"Accept": "application/xml"
}
message = data
if not external:
if isinstance(data, dict):
archive = prepare_upload_bundle(name, data)
message = open(archive, 'rb')
if "tfw" in data:
# If application/archive was used, server crashes with
# a 500 error read in many sites that application/zip
# will do the trick. Successfully tested
headers['Content-type'] = 'application/zip'
ext = "worldimage"
elif isinstance(data, str):
message = open(data, 'rb')
else:
message = data
cs_url = urljoin(
self.service_url,
"workspaces/{}/coveragestores/{}/{}{}".format(
workspace.name, name, store_type, ext
)
)
params = {"configure": "first", "coverageName": name}
try:
r = self.session.put(cs_url, data=message, headers=headers,
params=params)
self._cache.clear()
if r.status_code != 201:
raise UploadError(r.text)
finally:
if hasattr(message, "close"):
message.close()
if archive is not None:
os.unlink(archive)
def harvest_externalgranule(self, data, store):
"""
Harvest a granule into an existing imagemosaic
"""
params = dict()
cs_url = urljoin(
self.service_url,
"workspaces/{}/coveragestores/{}/external.imagemosaic".format(
store.workspace.name,
store.name
)
)
# POST /workspaces//coveragestores//external.imagemosaic
headers = {
"Content-type": "text/plain",
"Accept": "application/xml"
}
r = self.session.post(cs_url, data=data,
headers=headers, params=params)
self._cache.clear()
if r.status_code != 202:
raise UploadError(r.text)
def harvest_uploadgranule(self, data, store):
"""
Harvest a granule into an existing imagemosaic
Difference with harvest_externalgranule?
"""
params = dict()
cs_url = urljoin(
self.service_url,
"workspaces/{}/coveragestores/{}/file.imagemosaic".format(
store.workspace.name,
store.name
)
)
# POST /workspaces//coveragestores//file.imagemosaic
headers = {
"Content-type": "application/zip",
"Accept": "application/xml"
}
message = open(data, 'rb')
try:
r = self.session.post(cs_url, data=message,
headers=headers, params=params)
self._cache.clear()
if r.status_code != 202:
raise UploadError(r.text)
finally:
if hasattr(message, "close"):
message.close()
def mosaic_coverages(self, store):
"""
Print granules of an existing imagemosaic
"""
params = dict()
cs_url = urljoin(
self.service_url,
"workspaces/{}/coveragestores/{}/coverages.json".format(
store.workspace.name,
store.name
)
)
# GET /workspaces//coveragestores//coverages.json
headers = {
"Content-type": "application/json",
"Accept": "application/json"
}
r = self.session.get(cs_url, headers=headers, params=params)
self._cache.clear()
coverages = json.loads(r.text, object_hook=_decode_dict)
return coverages
def mosaic_coverage_schema(self, coverage, store):
"""
Print granules of an existing imagemosaic
"""
params = dict()
cs_url = urljoin(
self.service_url,
"workspaces/{}/coveragestores/{}/coverages/{}/index.json".format(
store.workspace.name,
store.name,
coverage
)
)
# GET /workspaces//coveragestores//coverages//index.json
headers = {
"Content-type": "application/json",
"Accept": "application/json"
}
r = self.session.get(cs_url, headers=headers, params=params)
self._cache.clear()
schema = json.loads(r.text, object_hook=_decode_dict)
return schema
def mosaic_granules(self, coverage, store, filter_=None, limit=None,
offset=None):
"""
Print granules of an existing imagemosaic
"""
params = dict()
if filter_ is not None:
params['filter'] = filter_
if limit is not None:
params['limit'] = limit
if offset is not None:
params['offset'] = offset
p = "workspaces/{}/coveragestores/{}/coverages/{}/index/granules.json"
cs_url = urljoin(
self.service_url,
p.format(store.workspace.name, store.name, coverage)
)
# GET /workspaces//coveragestores//coverages//index/granules.json
headers = {
"Content-type": "application/json",
"Accept": "application/json"
}
r = self.session.get(cs_url, headers=headers, params=params)
self._cache.clear()
granules = json.loads(r.text, object_hook=_decode_dict)
return granules
def mosaic_delete_granule(self, coverage, store, granule_id):
"""
Deletes a granule of an existing imagemosaic
"""
params = dict()
p = "workspaces/{}/coveragestores/{}/coverages/{}"\
+ "/index/granules/{}.json"
cs_url = urljoin(
self.service_url,
p.format(store.workspace.name, store.name, coverage, granule_id)
)
# DELETE /workspaces//coveragestores//coverages//index/granules/.json
headers = {
"Content-type": "application/json",
"Accept": "application/json"
}
r = self.session.delete(cs_url, headers=headers, params=params)
self._cache.clear()
if r.status_code != 200:
raise FailedRequestError(r.text)
def publish_featuretype(self, name, store, native_crs, srs=None,
jdbc_virtual_table=None, native_bbox=None):
"""
Publish a featuretype from data in an existing store
"""
# @todo native_srs doesn't seem to get detected, even when in the DB
# metadata (at least for postgis in geometry_columns) and then there
# will be a misconfigured layer
if native_crs is None:
raise ValueError("must specify native_crs")
srs = srs or native_crs
feature_type = FeatureType(self, store.workspace, store, name)
# because name is the in FeatureType base class, work around that
# and hack in these others that don't have xml properties
feature_type.dirty['name'] = name
feature_type.dirty['srs'] = srs
feature_type.dirty['nativeCRS'] = native_crs
if native_bbox is not None:
feature_type.native_bbox = native_bbox
feature_type.enabled = True
feature_type.title = name
headers = {
"Content-type": "application/xml",
"Accept": "application/xml"
}
resource_url = store.resource_url
if jdbc_virtual_table is not None:
feature_type.metadata = ({
'JDBC_VIRTUAL_TABLE': jdbc_virtual_table
})
params = dict()
resource_url = urljoin(
self.service_url,
"workspaces/{}/datastores/{}/featuretypes.json".format(
store.workspace.name,
store.name
)
)
# What is the use of this request?
params={}
r = self.session.post(resource_url, data=feature_type.message(),
headers=headers, params=params)
if r.status_code < 200 or r.status_code > 299:
raise UploadError(r.text)
feature_type.fetch()
return feature_type
def get_resource(self, name, store=None, workspace=None):
if store is not None and workspace is not None:
if isinstance(workspace, str):
workspace = self.get_workspace(workspace)
if isinstance(store, str):
store = self.get_store(store, workspace)
if store is not None:
return store.get_resources(name)
if store is not None:
candids = [s for s in self.get_resources(store) if s.name == name]
if len(candids) == 0:
return None
elif len(candids) > 1:
raise AmbiguousRequestError
else:
return candids[0]
if workspace is not None:
for store in self.get_stores(workspace):
resource = self.get_resource(name, store)
if resource is not None:
return resource
return None
for ws in self.get_workspaces():
resource = self.get_resource(name, workspace=ws)
if resource is not None:
return resource
return None
def get_resource_by_url(self, url):
xml = self.get_xml(url)
name = xml.find("name").text
if xml.tag == 'featureType':
resource = FeatureType
elif xml.tag == 'coverage':
resource = Coverage
else:
raise Exception('drat')
return resource(self, None, None, name, href=url)
def get_resources(self, store=None, workspace=None):
if isinstance(workspace, str):
workspace = self.get_workspace(workspace)
if isinstance(store, str):
store = self.get_store(store, workspace)
if store is not None:
return store.get_resources()
if workspace is not None:
resources = []
for store in self.get_stores(workspace):
resources.extend(self.get_resources(store))
return resources
resources = []
for ws in self.get_workspaces():
resources.extend(self.get_resources(workspace=ws))
return resources
def get_layer(self, name):
try:
layer = Layer(self, name)
layer.fetch()
return layer
except FailedRequestError:
return None
def get_layers(self, resource=None):
if isinstance(resource, str):
resource = self.get_resource(resource)
layers_url = urljoin(
self.service_url,
"layers.xml"
)
description = self.get_xml(layers_url)
layers = [Layer(self, l.find("name").text)
for l in description.findall("layer")]
if resource is not None:
layers = [l for l in layers if l.resource.href == resource.href]
# TODO: Filter by style
return layers
def get_layergroup(self, name=None, workspace=None):
try:
path_parts = "layergroups/{}.xml".format(name)
if workspace is not None:
wks_name = _name(workspace)
path_parts = "workspaces/{}".format(wks_name) + path_parts
group_url = urljoin(
self.service_url,
path_parts
)
group = self.get_xml(group_url)
if group.find("workspace"):
wks_name = group.find("workspace").find("name").text
else:
wks_name = None
return LayerGroup(self, group.find("name").text, wks_name)
except FailedRequestError:
return None
def get_layergroups(self, workspace=None):
wks_name = None
path_parts = 'layergroups.xml'
if workspace is not None:
wks_name = _name(workspace)
path_parts = 'workspaces/{}/{}'.format(wks_name, path_parts)
groups_url = urljoin(
self.service_url,
path_parts
)
groups = self.get_xml(groups_url)
return [LayerGroup(self, g.find("name").text, wks_name)
for g in groups.findall("layerGroup")]
def create_layergroup(self, name, layers=(), styles=(), bounds=None,
abstract=None, title=None, workspace=None):
if any(g.name == name for g in self.get_layergroups()):
msg = "LayerGroup named {} already exists!"
raise ConflictingDataError(msg.format(name))
else:
return UnsavedLayerGroup(self, name, layers, styles, bounds,
abstract, title, workspace)
def get_style(self, name, workspace=None):
"""
Find a Style in the catalog if one exists that matches the given name.
If name is fully qualified in the form of `workspace:name` the workspace
may be ommitted.
:param name: name of the style to find
:param workspace: optional workspace to search in
"""
if ':' in name:
workspace, name = name.split(':', 1)
try:
style = Style(self, name, _name(workspace))
style.fetch()
except FailedRequestError:
style = None
return style
def get_style_by_url(self, style_workspace_url):
try:
dom = self.get_xml(style_workspace_url)
except FailedRequestError:
return None
rest_parts = style_workspace_url.replace(self.service_url, '')\
.split('/')
# check for /workspaces//styles/
workspace = None
if 'workspaces' in rest_parts:
workspace = rest_parts[rest_parts.index('workspaces') + 1]
return Style(self, dom.find("name").text, workspace)
def get_styles(self, workspace=None):
styles_xml = "styles.xml"
if workspace is not None:
styles_xml = "workspaces/{0}/styles.xml".format(_name(workspace))
styles_url = urljoin(
self.service_url,
styles_xml
)
description = self.get_xml(styles_url)
return [Style(self, s.find('name').text)
for s in description.findall("style")]
def create_style(self, name, data, overwrite=False, workspace=None,
style_format="sld10", raw=False):
style = self.get_style(name, workspace)
if not overwrite and style is not None:
msg = "There is already a style named {}".format(name)
raise ConflictingDataError(msg)
if not overwrite or style is None:
headers = {
"Content-type": "application/xml",
"Accept": "application/xml"
}
xml = ""
xml = xml.format(name)
style = Style(self, name, workspace, style_format)
r = self.session.post(style.create_href, data=xml, headers=headers)
if r.status_code < 200 or r.status_code > 299:
raise UploadError(r.text)
headers = {
"Content-type": style.content_type,
"Accept": "application/xml"
}
body_href = style.body_href
if raw:
body_href += "?raw=true"
r = self.session.put(body_href, data=data, headers=headers)
if r.status_code < 200 or r.status_code > 299:
raise UploadError(r.text)
self._cache.pop(style.href, None)
self._cache.pop(style.body_href, None)
def create_workspace(self, name):
xml = "{name}"\
.format(name=name)
headers = {"Content-Type": "application/xml"}
workspace_url = urljoin(
self.service_url,
"workspaces/"
)
r = self.session.post(workspace_url, data=xml, headers=headers)
assert 200 <= r.status_code < 300,\
"Tried to create workspace but got {}: {}".format(r.status_code,
r.text)
self._cache.pop(urljoin(self.service_url, "workspaces.xml"), None)
return self.get_workspace(name)
def get_workspaces(self):
rest_url = urljoin(
self.service_url,
"workspaces.xml"
)
description = self.get_xml(rest_url)
return [workspace_from_index(self, node)
for node in description.findall("workspace")]
def get_workspace(self, name):
candidates = [w for w in self.get_workspaces() if w.name == name]
if len(candidates) == 0:
return None
elif len(candidates) > 1:
raise AmbiguousRequestError()
else:
return candidates[0]
def get_default_workspace(self):
ws = Workspace(self, "default")
# must fetch and resolve the 'real' workspace from the response
ws.fetch()
return workspace_from_index(self, ws.dom)
def set_default_workspace(self, name):
if hasattr(name, 'name'):
name = name.name
workspace = self.get_workspace(name)
if workspace is not None:
headers = {"Content-Type": "application/xml"}
default_workspace_url = urljoin(
self.service_url,
"workspaces/default.xml"
)
msg = "{}".format(name)
r = self.session.put(default_workspace_url, data=msg,
headers=headers)
assert 200 <= r.status_code < 300,\
"Error setting default workspace: {}: {}"\
.format(r.status_code, r.text)
self._cache.pop(default_workspace_url, None)
self._cache.pop("{}/workspaces.xml".format(self.service_url), None)
else:
raise FailedRequestError("no workspace named '{}'".format(name))
def _name(named):
"""
Get the name out of an object. This varies based on the type of the input:
* the "name" of a string is itself
* the "name" of None is itself
* the "name" of an object with a property named name is that property -
as long as it's a string
* otherwise, we raise a ValueError
"""
if isinstance(named, str) or named is None:
return named
elif hasattr(named, 'name') and isinstance(named.name, str):
return named.name
else:
msg = "Can't interpret {} as a name or a configuration object"
raise ValueError(msg.format(named))
class UploadError(Exception):
pass
class ConflictingDataError(Exception):
pass
class AmbiguousRequestError(Exception):
pass
class FailedRequestError(Exception):
pass