ckanext-d4science_theme/ckanext/d4science_theme/helpers.py

716 lines
25 KiB
Python
Raw Normal View History

2023-09-21 17:01:55 +02:00
import ckan.authz as authz
import ckan.model as model
from webhelpers.html import escape, HTML, literal, url_escape
from webhelpers.text import truncate
import ckan.lib.helpers as h
import ckan.logic as logic
from ckan.common import config
from ckanext.d4science_theme.d4sdiscovery.d4s_namespaces_controller import D4S_Namespaces_Controller
from ckanext.d4science_theme.d4sdiscovery.d4s_namespaces_extras_util import D4S_Namespaces_Extra_Util
from ckanext.d4science_theme.qrcodelink.generate_qrcode import D4S_QrCode
import urllib2
from ckan.common import (
_, ungettext, g, c, request, session, json, OrderedDict
)
import random
from operator import itemgetter
from logging import getLogger
import base64
import sys, os, re
import ConfigParser
import collections
log = getLogger(__name__)
systemtype_field = 'systemtypefield'
systemtype_field_default_value = 'system:type'
ic_proxy_url_field = 'ic_proxy_url'
ic_proxy_url_field_default_value = "https://registry.d4science.org/icproxy/gcube/service"
application_token_field = 'application_token'
namespaces_generic_resource_id_default_value = "23d827cd-ba8e-4d8c-9ab4-6303bdb7d1db"
namespaces_gr_id_fieldname = "namespaces_generic_resource_id"
namespaceseparator_field = 'namespace_separator'
namespaceseparator_field_default_value = ':'
systemtype_rgb_colors = ['#c0392b ', '#585858', '#04407C', '#9b59b6', '#2ecc71', '#16a085', '#7f8c8d ', '#2ecc71',
'#FA8072', '#00FFFF', '#C76611', '#f39c12', '#800000']
systemtype_field_colors = 'systemtype_field_colors'
systemtype_cms_fields_placeholders = {'prefix': 'system:cm_', 'item_status': 'system:cm_item_status'}
NOCATEOGORY = 'nocategory'
TRANSLATE_OF_ = 'translate_of_'
ctg_namespace_ctrl = None
# ADDED BY FRANCESCO.MANGIACRAPA, related to Task #5196
def get_user_role_for_group_or_org(group_id, user_name):
''' Returns the user's role for the group. (Ignores privileges that cascade
in a group hierarchy.)'''
return authz.users_role_for_group_or_org(group_id, user_name)
# ADDED BY FRANCESCO.MANGIACRAPA, related to breadcrumb for Group
def get_parents_for_group(group_name_or_id):
''' Returns the user's role for the group. (Ignores privileges that cascade
in a group hierarchy.)'''
group = model.Group.get(group_name_or_id)
if group:
return model.Group.get_parent_group_hierarchy(group)
else:
return None
# ADDED BY FRANCESCO.MANGIACRAPA
def get_header_param(parameter_name, default=None):
''' This function allows templates to access header string parameters
from the request. '''
return request.headers.get(parameter_name, default)
# ADDED BY FRANCESCO.MANGIACRAPA
def get_request_param(parameter_name, default=None):
''' This function allows templates to access query string parameters
from the request. '''
return request.params.get(parameter_name, default)
# ADDED BY FRANCESCO.MANGIACRAPA
def get_cookie_value(cookie_name, default=None):
''' This function allows templates to access cookie by cookie_name parameter
from the request. '''
value = request.cookies.get(cookie_name)
if value is None:
print 'cookie: ' + cookie_name + ', has value None'
else:
print 'cookie: ' + cookie_name + ', has value ' + value
return value
# Updated BY FRANCESCO.MANGIACRAPA, added allow_html
def markdown_extract_html(text, extract_length=190, allow_html=False):
''' Returns the plain text representation of markdown encoded text. That
is the texted without any html tags. If extract_length is 0 then it
will not be truncated.'''
if not text:
return ''
if allow_html:
plain = h.markdown(text.strip())
else:
plain = h.RE_MD_HTML_TAGS.sub('', h.markdown(text))
if not extract_length or len(plain) < extract_length:
return literal(plain)
return literal(unicode(truncate(plain, length=extract_length, indicator='...', whole_word=True)))
def get_systemtype_field_dict_from_session():
'''Return the value of 'ckan.d4science_theme.metadatatypefield'
read from production.ini'''
systemtype_fieldname = session.get(systemtype_field)
if systemtype_fieldname is None:
log.info(systemtype_field + " not found in session, loading from config")
else:
log.debug(systemtype_field + " found in session having value: %s" % systemtype_fieldname)
return systemtype_fieldname
systemtype_fieldname = config.get('ckan.d4science_theme.' + systemtype_field)
if systemtype_fieldname is None:
log.info(
systemtype_field + " field does not exist in production.ini, returning default value %s" % systemtype_field_default_value)
systemtype_fieldname = systemtype_field_default_value
separator = get_namespace_separator_from_session()
log.debug("Replacing %s" % separator + " with empty string for key %s" % systemtype_field)
systemtype_fieldname_name = systemtype_fieldname.replace(separator, "")
purgedfieldname = purge_namespace_to_fieldname(systemtype_fieldname)
log.debug("Setting %s" % systemtype_fieldname + " in session for key %s" % systemtype_field)
session[systemtype_field] = {'id': systemtype_fieldname, 'name': systemtype_fieldname_name,
'title': purgedfieldname}
session.save()
return session[systemtype_field]
def get_d4s_namespace_controller():
'''Instance the D4S_Namespaces_Controller and check that the namespaces are not empty reading it from IS and/or using a Caching system.
The ic-proxy-url is built by reading the configurations from production.ini'''
d4s_extras_controller = D4S_Namespaces_Controller.getInstance()
global ctg_namespace_ctrl
if ctg_namespace_ctrl is not None:
log.info("ctg_namespace_ctrl with configurations is NOT None")
the_namespaces = d4s_extras_controller.load_namespaces(ctg_namespace_ctrl['ic_proxy_url'],
ctg_namespace_ctrl['resource_id'],
ctg_namespace_ctrl['application_token'])
log.debug("the_namespaces are %s" % the_namespaces)
if the_namespaces is None or len(the_namespaces) == 0:
log.info("D4S_Namespaces_Controller obj with none or empty namespaces, going to read them")
else:
log.info("d4s_namespaces_controller found and the namespaces property is not empty: %s" % d4s_extras_controller)
return d4s_extras_controller
else:
log.info("ctg_namespace_ctrl with configurations is None, instancing it")
ic_proxy_url_value = config.get('ckan.d4science_theme.' + ic_proxy_url_field)
if ic_proxy_url_value is None:
log.info(
"ckan.d4science_theme." + ic_proxy_url_field + " field does not exist in production.ini, returning default value %s" % ic_proxy_url_field_default_value)
ic_proxy_url_value = ic_proxy_url_field_default_value
application_token_fieldname = config.get('ckan.d4science_theme.' + application_token_field)
if application_token_fieldname is None:
log.error("ckan.d4science_theme." + application_token_field + " field does not exist in production.ini!!!")
application_token_fieldname = None
namespaces_gr_id_fieldname_value = config.get('ckan.d4science_theme.' + namespaces_gr_id_fieldname)
if namespaces_gr_id_fieldname_value is None:
log.error("ckan.d4science_theme." + application_token_field + " field does not exist in production.ini!!!")
namespaces_gr_id_fieldname_value = namespaces_generic_resource_id_default_value
# filling the ctg_namespace_ctrl with IS configurations to perform the query for loading the namespaces from IS
ctg_namespace_ctrl = {'ic_proxy_url': ic_proxy_url_value,
'application_token': application_token_fieldname,
'resource_id': namespaces_gr_id_fieldname_value}
d4s_extras_controller.load_namespaces(ctg_namespace_ctrl['ic_proxy_url'], ctg_namespace_ctrl['resource_id'],
ctg_namespace_ctrl['application_token'])
return d4s_extras_controller
def get_extras_indexed_for_namespaces(extras):
namespace_dict = get_namespaces_dict()
# log.info("my_namespace_dict %s" % namespace_dict)
my_extra = get_extras(extras)
# log.info("my_extra is %s" % my_extra)
# d4s_extras_controller = D4S_Namespaces_Controller.getInstance()
# extras_indexed_for_categories = d4s_extras_controller.get_extras_indexed_for_namespaces(namespace_dict, my_extra)
extras_indexed_for_categories = D4S_Namespaces_Extra_Util().get_extras_indexed_for_namespaces(namespace_dict,
my_extra)
return extras_indexed_for_categories
def get_namespaces_dict():
d4s_extras_controller = get_d4s_namespace_controller()
if d4s_extras_controller is not None:
return d4s_extras_controller.get_dict_ctg_namespaces()
else:
log.info("local_extras_controller is null, returning empty dictionary for namespaces")
return {}
def get_extra_for_category(extras_indexed_for_categories, key_category):
if extras_indexed_for_categories.has_key(key_category):
catalogue_namespace = extras_indexed_for_categories[key_category]
return catalogue_namespace.extras
return []
def get_systemtype_value_from_extras(package, extras=None):
'''Returns the value of metadata fied read from key 'metadatatype'
stored into extra fields if it exists, 'No Type' otherwise'''
systemtype_dict = get_systemtype_field_dict_from_session()
no_type = 'No Type'
if extras is None:
return no_type
for extra in extras:
k, v = extra['key'], extra['value']
log.debug("key is %s" % k)
log.debug("value is %s" % v)
if k == str(systemtype_dict['id']):
return v
return no_type
def get_namespace_separator_from_session():
'''Returns the character used to separate namespace from fieldname'''
separator = session.get(namespaceseparator_field)
if separator is None:
log.info(namespaceseparator_field + " not found in session, loading from config")
else:
log.debug(namespaceseparator_field + " found in session: %s" % separator)
return separator
namespace_sep = config.get('ckan.d4science_theme.' + namespaceseparator_field)
if namespace_sep is None:
log.info(
namespaceseparator_field + " field does not exist in production.ini, returning default value %s" % namespaceseparator_field_default_value)
namespace_sep = namespaceseparator_field_default_value
log.debug("Setting %s" % namespace_sep + " in session for key %s" % namespaceseparator_field)
session[namespaceseparator_field] = namespace_sep
return namespace_sep
def get_extras(package_extras, auto_clean=False, subs=None, exclude=None):
''' Used for outputting package extras
:param package_extras: the package extras
:type package_extras: dict
:param auto_clean: If true capitalize and replace -_ with spaces
:type auto_clean: bool
:param subs: substitutes to use instead of given keys
:type subs: dict {'key': 'replacement'}
:param exclude: keys to exclude
:type exclude: list of strings
'''
# If exclude is not supplied use values defined in the config
if not exclude:
exclude = g.package_hide_extras
output = []
for extra in package_extras:
if extra.get('state') == 'deleted':
continue
k, v = extra['key'], extra['value']
if k in exclude:
continue
if subs and k in subs:
k = subs[k]
elif auto_clean:
k = k.replace('_', ' ').replace('-', ' ').title()
if isinstance(v, (list, tuple)):
v = ", ".join(map(unicode, v))
output.append((k, v))
return output
def purge_namespace_to_fieldname(fieldname):
separator = get_namespace_separator_from_session()
if fieldname is None:
return ""
if separator not in fieldname:
return fieldname
end = fieldname.index(separator) + 1
max_l = len(fieldname)
if end < max_l:
return fieldname[end:max_l]
return fieldname
def purge_namespace_to_string(facet):
if not c.search_facets or \
not c.search_facets.get(facet) or \
not c.search_facets.get(facet).get('items'):
return ""
facet_name = c.search_facets.get(facet)
print "facet_name " + str(facet_name)
end = str(facet_name).index(":")
if end <= len(facet_name):
return facet_name[:end]
return facet_name
def count_facet_items_dict(facet, limit=None, exclude_active=False):
if not c.search_facets or \
not c.search_facets.get(facet) or \
not c.search_facets.get(facet).get('items'):
return 0
facets = []
for facet_item in c.search_facets.get(facet)['items']:
if not len(facet_item['name'].strip()):
continue
if not (facet, facet_item['name']) in request.params.items():
facets.append(dict(active=False, **facet_item))
elif not exclude_active:
facets.append(dict(active=True, **facet_item))
# for count,
# print "facets " + str(facets)
total = len(facets)
log.debug("total facet: %s" % facet + " are %d" % total)
return total
def random_color():
rgbl = [255, 0, 0]
random.shuffle(rgbl)
return tuple(rgbl)
def check_url(the_url):
try:
urllib2.urlopen(the_url)
return True
except urllib2.HTTPError, e:
# print(e.code)
return False
except urllib2.URLError, e:
# print(e.args)
return False
except Exception as error:
# print(error)
return False
def get_color_for_type(systemtype_field_value):
'''Return a color assigned to a system type'''
systemtypecolors = session.get(systemtype_field_colors)
# log.info("color: getting color for type: %s" %systemtype_field_value)
if systemtypecolors is None:
log.info("color: " + systemtype_field_colors + " not found in session, creating new one")
systemtypecolors = {}
session[systemtype_field_colors] = systemtypecolors
else:
log.debug("color: " + systemtype_field_colors + " found in session having value: %s" % systemtypecolors)
e_color = systemtypecolors.get(systemtype_field_value)
if e_color is None:
usedcolorsLen = len(systemtypecolors)
colorsLen = len(systemtype_rgb_colors)
index = usedcolorsLen if usedcolorsLen < colorsLen else random.randint(0, colorsLen - 1)
e_color = systemtype_rgb_colors[index]
# log.debug("color: adding color %s" %e_color +" index is: "+str(index))
systemtypecolors[systemtype_field_value] = e_color
session[systemtype_field_colors] = systemtypecolors
session.save()
# log.debug("color: returning color %s" %e_color +" for type: "+systemtype_field_value)
return e_color
def ordered_dictionary(list_to_be_sorted, property='name', ordering="asc"):
# print ("dict %s" %list_to_be_sorted)
ord = False if ordering == "asc" else True
if list_to_be_sorted:
return sorted(list_to_be_sorted, key=itemgetter(property), reverse=ord)
return list_to_be_sorted
def qrcode_for_url(url):
if url:
try:
qr_code = D4S_QrCode(url)
image_path = qr_code.get_qrcode_path()
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read())
return ""
except Exception as error:
log.error("Error on getting qrcode for url: " + url + "error: %s" % error)
return ""
def get_list_of_organizations(limit=10, sort='packages'):
to_browse_organizations = []
try:
data = {}
if sort:
data['sort'] = sort
data['limit'] = limit
data['all_fields'] = True
ordered_organizations = []
ordered_organizations = logic.get_action('organization_list')({}, data)
for organization in ordered_organizations:
try:
to_browse_obj = {}
if not organization['name']:
continue
to_browse_obj['name'] = organization['name']
if 'package_count' in organization:
to_browse_obj['package_count'] = organization['package_count']
if 'display_name' in organization:
to_browse_obj['display_name'] = organization['display_name']
image_url = get_url_to_icon_for_ckan_entity(organization['name'], 'organization', False)
# Using ICON as first option
if image_url:
to_browse_obj['url'] = image_url
# Using object image_url as second one
elif 'image_url' in organization and organization['image_url']:
to_browse_obj['url'] = organization['image_url']
# Default placeholder
else:
to_browse_obj['url'] = h.url_for_static('/images/organisations/icon/placeholder-organization.png')
to_browse_organizations.append(to_browse_obj)
except (logic.NotFound, logic.ValidationError, logic.NotAuthorized) as error:
# SILENT
log.warn("Error on putting organization: %s" % error)
log.info("browse %d" % len(ordered_organizations) + " organisation/s")
except (logic.NotFound, logic.ValidationError, logic.NotAuthorized) as error:
log.error("Error on getting organizations: %s" % error)
return []
return to_browse_organizations
def get_list_of_groups(limit=10, sort='package_count'):
to_browse_groups = []
try:
data = {}
if sort:
data['sort'] = sort
data['limit'] = limit
data['all_fields'] = True
ordered_groups = []
ordered_groups = logic.get_action('group_list')({}, data)
for group in ordered_groups:
# print "\n\ngroup %s" %group
try:
to_browse_obj = {}
if not group['name']:
continue
to_browse_obj['name'] = group['name']
if 'package_count' in group:
to_browse_obj['package_count'] = group['package_count']
if 'display_name' in group:
to_browse_obj['display_name'] = group['display_name']
if 'image_url' in group and group['image_url']:
to_browse_obj['url'] = group['image_url']
else:
to_browse_obj['url'] = get_url_to_icon_for_ckan_entity(group['name'], 'group')
to_browse_groups.append(to_browse_obj)
except (logic.NotFound, logic.ValidationError, logic.NotAuthorized) as error:
# SILENT
log.warn("Error on putting group: %s" % error)
log.info("browse %d" % len(ordered_groups) + " organisation/s")
except (logic.NotFound, logic.ValidationError, logic.NotAuthorized) as error:
log.error("Error on getting group: %s" % error)
return []
return to_browse_groups
def get_browse_info_for_organisations_or_groups(type='organization', limit=10, sort_field=None):
sort = None
if sort_field:
sort = sort_field
if type == 'organization':
if sort:
return get_list_of_organizations(limit, sort)
else:
return get_list_of_organizations(limit)
elif type == 'group':
if sort:
return get_list_of_groups(limit, sort)
else:
return get_list_of_groups(limit)
return []
def get_image_display_for_group(item_id):
if item_id:
try:
item_obj = model.Group.get(item_id)
if item_obj and item_obj.image_url:
return item_obj.image_url
else:
return h.url_for_static('/images/groups/icon/placeholder-group.png')
except Exception as error:
log.error("Error on getting item obj: %s" % item_id + "error: %s" % error)
def get_application_path():
if getattr(sys, 'frozen', False):
# If the application is run as a bundle, the pyInstaller bootloader
# extends the sys module by a flag frozen=True and sets the app
# path into variable _MEIPASS'.
application_path = sys._MEIPASS
else:
application_path = os.path.dirname(os.path.abspath(__file__))
return application_path
'''
Get icon url for input entity type
@:param default_placeholder if True returns the URL of default image, otherwise None.
'''
def get_url_to_icon_for_ckan_entity(item_name, entity_type=None, default_placeholder=True):
if not entity_type or not item_name:
return None
dir_images_full_path = get_application_path() + "/public/images"
dir_images_relative_path = "/images"
if entity_type == 'group':
dir_images_full_path += "/groups"
dir_images_relative_path += "/groups"
placeholder_icon = "placeholder-group.png"
elif entity_type == 'organization':
dir_images_full_path += "/organisations"
dir_images_relative_path += "/organisations"
placeholder_icon = "placeholder-organization.png"
elif entity_type == 'type':
dir_images_full_path += "/types"
dir_images_relative_path += "/types"
placeholder_icon = "placeholder-type.png"
else:
return None
icon_path = dir_images_full_path + "/icon/" + item_name.lower() + ".png"
if os.path.isfile(icon_path):
return h.url_for_static(dir_images_relative_path + "/icon/" + item_name.lower() + ".png")
elif default_placeholder:
return h.url_for_static(dir_images_relative_path + "/icon/" + placeholder_icon)
return None
def get_user_info(user_id_or_name):
if user_id_or_name:
try:
item_obj = model.User.get(user_id_or_name)
if item_obj:
return item_obj
return None
except Exception as error:
log.error("Error on getting item obj: %s" % user_id_or_name + "error: %s" % error)
return None
'''
Search the value of my_search_string into input file {ckan_po_file} or the default file ckan.po provided as CKAN language
and returns its translate
'''
def get_ckan_translate_for(ckan_po_file, my_search_string):
my_translate = session.get(TRANSLATE_OF_ + my_search_string)
if not my_search_string:
return ""
if my_translate:
log.info("Translate of '%s' " % my_search_string + " found in session as: %s" % my_translate)
return my_translate
if not ckan_po_file:
ckan_po_file = "/usr/lib/ckan/default/src/ckan/ckan/i18n/en_gcube/LC_MESSAGES/ckan.po"
numlines = 0
numfound = 0
found = 0
line_text = ""
try:
infile = open(ckan_po_file, "r")
for line in infile:
numlines += 1
if found > 0:
numfound += 1
line_text += str(line)
found = 0 # reset found
# found += line.upper().count(my_search_string.upper())
found += line.count(my_search_string)
if found > 0:
log.debug("The search string '%s'" % my_search_string + " was found. Read the line: %s" % str(line))
infile.close()
except Exception as e:
print "Exception during parsing the file %s" % ckan_po_file, e
log.info("Recap: '%s' was found" % my_search_string + " %i times " % numfound + "in %i lines" % numlines)
log.debug("Line text is: %s" % line_text)
pattern = '"([A-Za-z0-9_ \./\\-]*)"'
m = re.search(pattern, line_text)
try:
my_translate = m.group()
except Exception as e:
print "Pattern %s" % my_search_string + " not found ", e
if my_translate:
log.debug("Replacing quotas...")
my_translate = my_translate.replace("\"", "")
log.info("Found the string '%s'" % my_translate + " that translating '%s'" % my_search_string)
session[TRANSLATE_OF_ + my_search_string] = my_translate
session.save()
return my_translate
def get_location_to_bboxes():
config = ConfigParser.ConfigParser()
config.optionxform = str
location_to_bboxes = {}
try:
bboxes_file = get_application_path() + "/public/location_to_bboxes.ini"
log.debug("bboxes_file is: '%s'" % bboxes_file)
config.read(bboxes_file)
for section_name in config.sections():
log.debug('Location to bboxes Section: ' + section_name)
# print ' Options:', parser.options(section_name)
for name, value in config.items(section_name):
location_to_bboxes[name] = value.replace(",", "%2C")
ordDictBboxes = collections.OrderedDict(sorted(location_to_bboxes.items()))
log.debug("Ordered 'bboxes_file' dict: '%s'" % ordDictBboxes)
return ordDictBboxes
except Exception as error:
log.error("Error on reading file: %s" % bboxes_file + "error: %s" % error)
def get_content_moderator_system_placeholder():
return systemtype_cms_fields_placeholders