236 lines
9.0 KiB
Python
236 lines
9.0 KiB
Python
|
#!/usr/bin/env python
|
||
|
# -*- coding: utf-8 -*-
|
||
|
#
|
||
|
# @author: Francesco Mangiacrapa
|
||
|
#
|
||
|
# Created on 2023/02/16
|
||
|
#
|
||
|
import requests
|
||
|
from JSON_Reader import JSON_Reader
|
||
|
from IAM_Token_Client import IAM_Token_Client
|
||
|
|
||
|
# Test files URLs are located here
|
||
|
folder_test_files_url = 'https://code-repo.d4science.org/gCubeSystem/geoportal-esquilino-sample-python-client/raw/branch/master/test_files/'
|
||
|
dataset_json_filename = 'test_dataset_esquilino.json'
|
||
|
pdf_filename = 'file-example_PDF_1MB.pdf'
|
||
|
image_filenames = ['arco-di-san-vito.jpg', 'piazza_esquilino.jpg']
|
||
|
shapefile_filename = 'area_di_scavo.shp'
|
||
|
|
||
|
# The KC client UMA TOKEN
|
||
|
service_Client_ID = ''
|
||
|
service_Client_Secret = ''
|
||
|
gcube_context = '/d4science.research-infrastructures.eu/gCubeApps/Esquiline'
|
||
|
|
||
|
# EndPoints
|
||
|
# D4Science IAM
|
||
|
IAM_URL = 'https://accounts.d4science.org/auth/realms/d4science/protocol/openid-connect/token'
|
||
|
# The Geoportal Service endpoint
|
||
|
geoportal_service_endpoint = 'https://geoportal-cms.int.d4science.net//geoportal-service/srv'
|
||
|
|
||
|
# The UCD ID
|
||
|
UCD_ID = 'esquilino'
|
||
|
|
||
|
|
||
|
def request_UMA_Token():
|
||
|
itr = IAM_Token_Client(IAM_URL, service_Client_ID, service_Client_Secret)
|
||
|
service_uma_token = itr.getUmaToken(gcube_context)
|
||
|
# print("Uma token is: %s" % service_uma_token)
|
||
|
return "Bearer " + service_uma_token
|
||
|
|
||
|
|
||
|
class Geoportal_Client_Esquilino():
|
||
|
|
||
|
def __init__(self):
|
||
|
self.init_sample_filesets_for_esquilino()
|
||
|
|
||
|
def init_sample_filesets_for_esquilino(self):
|
||
|
self.fileset_fonteInformazione = {
|
||
|
"toSetAccess": {
|
||
|
"_policy": "OPEN",
|
||
|
"_license": "CCBYSA.4.0"
|
||
|
},
|
||
|
"clashOption": "REPLACE_EXISTING",
|
||
|
"fieldDefinitionPath": "$.fonteInformazione._children[?(@.filesetIta)]",
|
||
|
"parentPath": "$.fonteInformazione",
|
||
|
"fieldName": "filesetIta",
|
||
|
"streams": [{"url": folder_test_files_url + pdf_filename,
|
||
|
"filename": pdf_filename}]
|
||
|
}
|
||
|
|
||
|
self.fileset_documentazioneScientifica = {
|
||
|
"toSetAccess": {
|
||
|
"_policy": "OPEN",
|
||
|
"_license": "CCBYSA.4.0"
|
||
|
},
|
||
|
"clashOption": "REPLACE_EXISTING",
|
||
|
"fieldDefinitionPath": "$.documentazioneScientifica._children[?(@.fileset)]",
|
||
|
"parentPath": "$.documentazioneScientifica",
|
||
|
"fieldName": "fileset",
|
||
|
"streams": [{"url": folder_test_files_url + pdf_filename,
|
||
|
"filename": pdf_filename}]
|
||
|
}
|
||
|
|
||
|
self.fileset_datiAltimetrici = {
|
||
|
"toSetAccess": {
|
||
|
"_policy": "OPEN",
|
||
|
"_license": "CCBYSA.4.0"
|
||
|
},
|
||
|
"clashOption": "REPLACE_EXISTING",
|
||
|
"fieldDefinitionPath": "$.datiAltimetrici._children[?(@.fileset)]",
|
||
|
"parentPath": "$.datiAltimetrici",
|
||
|
"fieldName": "fileset",
|
||
|
"streams": [{"url": folder_test_files_url + shapefile_filename,
|
||
|
"filename": shapefile_filename}]
|
||
|
}
|
||
|
|
||
|
self.fileset_planimetria = {
|
||
|
"toSetAccess": {
|
||
|
"_policy": "OPEN",
|
||
|
"_license": "CCBYSA.4.0"
|
||
|
},
|
||
|
"clashOption": "REPLACE_EXISTING",
|
||
|
"fieldDefinitionPath": "$.planimetria._children[?(@.fileset)]",
|
||
|
"parentPath": "$.planimetria",
|
||
|
"fieldName": "fileset",
|
||
|
"streams": [{"url": folder_test_files_url + shapefile_filename,
|
||
|
"filename": shapefile_filename}]
|
||
|
}
|
||
|
|
||
|
def build_step(self, step_id):
|
||
|
step = {
|
||
|
"stepID": step_id
|
||
|
}
|
||
|
|
||
|
return step
|
||
|
|
||
|
# immaginiRappresentative' is managed as an Array by definition in the Esquilino UCD Model
|
||
|
def build_image_fileset(self, image_index, image_filename):
|
||
|
fileset_immagineRappresentativa = {
|
||
|
"toSetAccess": {
|
||
|
"_policy": "OPEN",
|
||
|
"_license": "CCBYSA.4.0"
|
||
|
},
|
||
|
"clashOption": "REPLACE_EXISTING",
|
||
|
"fieldDefinitionPath": "$.immaginiRappresentative._children[?(@.fileset)]",
|
||
|
"parentPath": "$.immaginiRappresentative[{}]".format(image_index),
|
||
|
"fieldName": "fileset",
|
||
|
"streams": [{"url": folder_test_files_url + image_filename,
|
||
|
"filename": image_filename}]
|
||
|
}
|
||
|
|
||
|
return fileset_immagineRappresentativa
|
||
|
|
||
|
def get_ucd(self, ucd_id):
|
||
|
print("\nget_ucd()")
|
||
|
request_url = geoportal_service_endpoint + "/ucd/" + ucd_id
|
||
|
payload = ""
|
||
|
headers = {
|
||
|
'Content-Type': 'application/json; charset=utf-8',
|
||
|
'Authorization': request_UMA_Token()
|
||
|
}
|
||
|
print("calling: " + request_url)
|
||
|
return requests.request("GET", request_url, headers=headers, data=payload)
|
||
|
|
||
|
def get_project(self, ucd_id, project_id):
|
||
|
print("\nget_project()")
|
||
|
request_url = geoportal_service_endpoint + "/projects/" + ucd_id + "/" + project_id
|
||
|
payload = ""
|
||
|
headers = {
|
||
|
'Content-Type': 'application/json; charset=utf-8',
|
||
|
'Authorization': request_UMA_Token()
|
||
|
}
|
||
|
print("calling: " + request_url)
|
||
|
return requests.request("GET", request_url, headers=headers, data=payload)
|
||
|
|
||
|
def create_project(self, ucd_id, json_data):
|
||
|
print("\ncreate_project()")
|
||
|
request_url = geoportal_service_endpoint + "/projects/" + ucd_id
|
||
|
headers = {
|
||
|
'Content-Type': 'application/json; charset=utf-8',
|
||
|
'Authorization': request_UMA_Token()
|
||
|
}
|
||
|
print("calling: " + request_url)
|
||
|
print("json_data: " + str(json_data))
|
||
|
return requests.post(request_url, headers=headers, json=json_data)
|
||
|
|
||
|
def registrer_fileset(self, ucd_id, project_id, json_data):
|
||
|
print("\nregistrer_fileset()")
|
||
|
request_url = geoportal_service_endpoint + "/projects/" + ucd_id + "/registerFiles/" + project_id
|
||
|
headers = {
|
||
|
'Content-Type': 'application/json; charset=utf-8',
|
||
|
'Authorization': request_UMA_Token()
|
||
|
}
|
||
|
print("calling: " + request_url)
|
||
|
print("json_data: " + str(json_data))
|
||
|
return requests.post(request_url, headers=headers, json=json_data)
|
||
|
|
||
|
def perform_step(self, ucd_id, project_id, json_data):
|
||
|
print("\nperform_step()")
|
||
|
request_url = geoportal_service_endpoint + "/projects/" + ucd_id + "/step/" + project_id
|
||
|
headers = {
|
||
|
'Content-Type': 'application/json; charset=utf-8',
|
||
|
'Authorization': request_UMA_Token()
|
||
|
}
|
||
|
print("calling: " + request_url)
|
||
|
print("json_data: " + str(json_data))
|
||
|
return requests.post(request_url, headers=headers, json=json_data)
|
||
|
|
||
|
|
||
|
def main():
|
||
|
json_esquilino_url = folder_test_files_url + dataset_json_filename;
|
||
|
json_reader = JSON_Reader(json_esquilino_url)
|
||
|
print("\n\nREAD TEST ESQUILINO JSON")
|
||
|
JSON_Reader.pretty_print_json(json_reader.data, 3)
|
||
|
|
||
|
gce = Geoportal_Client_Esquilino()
|
||
|
|
||
|
# print("\n\nRead UCD for id "+UCD_ID)
|
||
|
# response = gce.get_ucd(UCD_ID)
|
||
|
# print("\n\nRequest UCD response for %s" % UCD_ID)
|
||
|
# print("Response code is %s" % response.status_code)
|
||
|
# JSON_Reader.pretty_print_json(response.json(), 3)
|
||
|
|
||
|
print("\n\nCreate a Project for %s" % UCD_ID)
|
||
|
response = gce.create_project(UCD_ID, json_reader.data)
|
||
|
print("Response code is %s" % response.status_code)
|
||
|
print("Created the Project, response is: %s" % response.json())
|
||
|
|
||
|
# Reading the project_id from the response (just created Project)
|
||
|
json_data = JSON_Reader.to_json(response.text)
|
||
|
project_id = json_data["_id"]
|
||
|
|
||
|
# Registering FILESETS for
|
||
|
# Fonte Informazione
|
||
|
response = gce.registrer_fileset(UCD_ID, project_id, gce.fileset_fonteInformazione)
|
||
|
print("Response code is %s" % response.status_code)
|
||
|
# Documentazione Scientifica
|
||
|
response = gce.registrer_fileset(UCD_ID, project_id, gce.fileset_documentazioneScientifica)
|
||
|
print("Response code is %s" % response.status_code)
|
||
|
# Immagini
|
||
|
image_1 = gce.build_image_fileset(0, image_filenames[0])
|
||
|
response = gce.registrer_fileset(UCD_ID, project_id, image_1)
|
||
|
print("Response code is %s" % response.status_code)
|
||
|
image_2 = gce.build_image_fileset(1, image_filenames[1])
|
||
|
response = gce.registrer_fileset(UCD_ID, project_id, image_2)
|
||
|
print("Response code is %s" % response.status_code)
|
||
|
# Dati Altimetrici
|
||
|
response = gce.registrer_fileset(UCD_ID, project_id, gce.fileset_datiAltimetrici)
|
||
|
print("Response code is %s" % response.status_code)
|
||
|
# Planimetria
|
||
|
response = gce.registrer_fileset(UCD_ID, project_id, gce.fileset_planimetria)
|
||
|
print("Response code is %s" % response.status_code)
|
||
|
|
||
|
# Performing STEP PUBLISH
|
||
|
step_publish = gce.build_step("PUBLISH")
|
||
|
response = gce.perform_step(UCD_ID, project_id, step_publish)
|
||
|
print("Response code is %s" % response.status_code)
|
||
|
|
||
|
# project_id = ""
|
||
|
# print("\n\nRead a Project for ucd "+UCD_ID+" and project "+project_id)
|
||
|
# response = gce.get_project(UCD_ID, project_id)
|
||
|
# print("Response code is %s" % response.status_code)
|
||
|
# print("\n\nCreate a Project response %s" % response.json())
|
||
|
|
||
|
|
||
|
main()
|