package eu.dnetlib.dhp.oa.graph.dump; import java.io.Serializable; import java.util.*; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Column; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.eosc.model.*; import eu.dnetlib.dhp.eosc.model.AccessRight; import eu.dnetlib.dhp.eosc.model.Author; import eu.dnetlib.dhp.eosc.model.Context; import eu.dnetlib.dhp.eosc.model.GeoLocation; import eu.dnetlib.dhp.eosc.model.Measure; import eu.dnetlib.dhp.eosc.model.OpenAccessRoute; import eu.dnetlib.dhp.eosc.model.Provenance; import eu.dnetlib.dhp.eosc.model.Result; import eu.dnetlib.dhp.oa.graph.dump.eosc.MasterDuplicate; import eu.dnetlib.dhp.oa.graph.dump.exceptions.CardinalityTooHighException; import eu.dnetlib.dhp.oa.graph.dump.exceptions.NoAvailableEntityTypeException; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; public class ResultMapper implements Serializable { private static final Logger log = LoggerFactory.getLogger(ResultMapper.class); public static Result map( E in, Map communityMap, List eoscIds) throws NoAvailableEntityTypeException, CardinalityTooHighException { log.info("*****************" + eoscIds.size()); Result out = new Result(); eu.dnetlib.dhp.schema.oaf.Result input = (eu.dnetlib.dhp.schema.oaf.Result) in; Optional ort = Optional.ofNullable(input.getResulttype()); if (ort.isPresent()) { try { addTypeSpecificInformation(out, input, ort.get()); mapAuthor(out, input); mapAccessRight(out, input); mapContributor(out, input); mapCountry(out, input); mapCoverage(out, input); out.setDateofcollection(input.getDateofcollection()); mapDescription(out, input); mapEmbrargo(out, input); mapMeasure(out, input); mapFormat(out, input); out.setId(input.getId()); mapOriginalId(out, input); mapInstance(out, input, eoscIds); mapLamguage(out, input); mapLastUpdateTimestamp(out, input); mapTitle(out, input); mapPid(out, input); mapAcceptanceDate(out, input); mapPublisher(out, input); mapSource(out, input); mapSubject(out, input); out.setType(input.getResulttype().getClassid()); mapContext(communityMap, out, input); mapCollectedfrom(out, input); } catch (ClassCastException cce) { return null; } } return out; } private static void mapCollectedfrom(Result out, eu.dnetlib.dhp.schema.oaf.Result input) { out .setCollectedfrom( input .getCollectedfrom() .stream() .map(cf -> CfHbKeyValue.newInstance(cf.getKey(), cf.getValue())) .collect(Collectors.toList())); } private static void mapFulltext(Result out, eu.dnetlib.dhp.schema.oaf.Result input) { if (Optional.ofNullable(input.getFulltext()).isPresent() && !input.getFulltext().isEmpty()) out.setFulltext(input.getFulltext().stream().map(ft -> ft.getValue()).collect(Collectors.toList())); } private static void mapContext(Map communityMap, Result out, eu.dnetlib.dhp.schema.oaf.Result input) { Set communities = communityMap.keySet(); List contextList = Optional .ofNullable( input .getContext()) .map( value -> value .stream() .map(c -> { String communityId = c.getId(); if (communityId.contains("::")) { communityId = communityId.substring(0, communityId.indexOf("::")); } if (communities.contains(communityId)) { Context context = new Context(); context.setCode(communityId); context.setLabel(communityMap.get(communityId)); Optional> dataInfo = Optional.ofNullable(c.getDataInfo()); if (dataInfo.isPresent()) { List provenance = new ArrayList<>(); provenance .addAll( dataInfo .get() .stream() .map( di -> Optional .ofNullable(di.getProvenanceaction()) .map( provenanceaction -> Provenance .newInstance( provenanceaction.getClassname(), di.getTrust())) .orElse(null)) .filter(Objects::nonNull) .collect(Collectors.toSet())); try { context.setProvenance(getUniqueProvenance(provenance)); } catch (NoAvailableEntityTypeException e) { e.printStackTrace(); } } return context; } return null; }) .filter(Objects::nonNull) .collect(Collectors.toList())) .orElse(new ArrayList<>()); if (!contextList.isEmpty()) { Set hashValue = new HashSet<>(); List remainigContext = new ArrayList<>(); contextList.forEach(c -> { if (!hashValue.contains(c.hashCode())) { remainigContext.add(c); hashValue.add(c.hashCode()); } }); out.setContext(remainigContext); } } private static void mapSubject(Result out, eu.dnetlib.dhp.schema.oaf.Result input) { if (Optional.ofNullable(input.getSubject()).isPresent()) { out.setSubject(createSubjectMap(input)); out .setKeywords( input .getSubject() .stream() .filter( s -> s.getQualifier().getClassid().equalsIgnoreCase("keyword") && !s.getValue().equalsIgnoreCase("EOSC::RO-crate")) .map(s -> s.getValue()) .collect(Collectors.toList())); if (Optional.ofNullable(input.getEoscifguidelines()).isPresent()) { out .setEoscIF( input .getEoscifguidelines() .stream() .map( eig -> EoscInteroperabilityFramework .newInstance( eig.getCode(), eig.getLabel(), eig.getUrl(), eig.getSemanticRelation())) .collect(Collectors.toList())); } } } private static void mapSource(Result out, eu.dnetlib.dhp.schema.oaf.Result input) { Optional .ofNullable(input.getSource()) .ifPresent( value -> out.setSource(value.stream().map(Field::getValue).collect(Collectors.toList()))); } private static void mapPublisher(Result out, eu.dnetlib.dhp.schema.oaf.Result input) { if (Optional.ofNullable(input.getPublisher()).isPresent()) { out.setPublisher(input.getPublisher().getValue()); } } private static void mapAcceptanceDate(Result out, eu.dnetlib.dhp.schema.oaf.Result input) { if (Optional.ofNullable(input.getDateofacceptance()).isPresent()) { out.setPublicationdate(input.getDateofacceptance().getValue()); } } private static void mapPid(Result out, eu.dnetlib.dhp.schema.oaf.Result input) { Optional .ofNullable(input.getPid()) .ifPresent( value -> out .setPid( value .stream() .map( p -> ResultPid .newInstance(p.getQualifier().getClassid(), p.getValue())) .collect(Collectors.toList()))); } private static void mapTitle(Result out, eu.dnetlib.dhp.schema.oaf.Result input) { if (Optional.ofNullable(input.getTitle()).isPresent()) { List iTitle = input .getTitle() .stream() .filter(t -> t.getQualifier().getClassid().equalsIgnoreCase("main title")) .collect(Collectors.toList()); if (!iTitle.isEmpty()) { out.setMaintitle(iTitle.get(0).getValue()); } iTitle = input .getTitle() .stream() .filter(t -> t.getQualifier().getClassid().equalsIgnoreCase("subtitle")) .collect(Collectors.toList()); if (!iTitle.isEmpty()) { out.setSubtitle(iTitle.get(0).getValue()); } } } private static void mapLastUpdateTimestamp(Result out, eu.dnetlib.dhp.schema.oaf.Result input) { if (Optional.ofNullable(input.getLastupdatetimestamp()).isPresent()) { out.setLastupdatetimestamp(input.getLastupdatetimestamp()); } } private static void mapLamguage(Result out, eu.dnetlib.dhp.schema.oaf.Result input) { if (Optional.ofNullable(input.getLanguage()).isPresent()) { out .setLanguage( Language.newInstance(input.getLanguage().getClassid(), input.getLanguage().getClassname())); } } private static void mapInstance(Result out, eu.dnetlib.dhp.schema.oaf.Result input, List eoscIds) { if (Optional .ofNullable(input.getInstance()) .isPresent()) { out .setInstance( input .getInstance() .stream() .map(i -> getCommunityInstance(i, input.getResulttype().getClassid(), eoscIds)) .collect(Collectors.toList())); } } private static void mapOriginalId(Result out, eu.dnetlib.dhp.schema.oaf.Result input) { out.setOriginalId(new ArrayList<>()); Optional .ofNullable(input.getOriginalId()) .ifPresent( v -> out .setOriginalId( input .getOriginalId() .stream() .filter(s -> !s.startsWith("50|")) .collect(Collectors.toList()))); } private static void mapFormat(Result out, eu.dnetlib.dhp.schema.oaf.Result input) { final List formatList = new ArrayList<>(); Optional .ofNullable(input.getFormat()) .ifPresent(value -> value.forEach(f -> formatList.add(f.getValue()))); out.setFormat(formatList); } private static void mapMeasure(Result out, eu.dnetlib.dhp.schema.oaf.Result input) { if (Optional.ofNullable(input.getMeasures()).isPresent()) { Indicator i = new Indicator(); UsageCounts uc = new UsageCounts(); input.getMeasures().forEach(m -> { if (m.getId().equals("downloads")) { uc.setDownloads(m.getUnit().get(0).getValue()); } if (m.getId().equals("views")) { uc.setViews(m.getUnit().get(0).getValue()); } }); if (!uc.isEmpty()) { i.setUsageCounts(uc); out.setIndicator(i); } } } private static void mapEmbrargo(Result out, eu.dnetlib.dhp.schema.oaf.Result input) { if (Optional.ofNullable(input.getEmbargoenddate()).isPresent()) { out.setEmbargoenddate(input.getEmbargoenddate().getValue()); } } private static void mapDescription(Result out, eu.dnetlib.dhp.schema.oaf.Result input) { final List descriptionList = new ArrayList<>(); Optional .ofNullable(input.getDescription()) .ifPresent(value -> value.forEach(d -> descriptionList.add(d.getValue()))); out.setDescription(descriptionList); } private static void mapCoverage(Result out, eu.dnetlib.dhp.schema.oaf.Result input) { final List coverageList = new ArrayList<>(); Optional .ofNullable(input.getCoverage()) .ifPresent(value -> value.stream().forEach(c -> coverageList.add(c.getValue()))); out.setCoverage(coverageList); } private static void mapCountry(Result out, eu.dnetlib.dhp.schema.oaf.Result input) { Optional .ofNullable(input.getCountry()) .ifPresent( value -> out .setCountry( value .stream() .map( c -> { if (c.getClassid().equals((ModelConstants.UNKNOWN))) { return null; } ResultCountry country = new ResultCountry(); country.setCode(c.getClassid()); country.setLabel(c.getClassname()); Optional .ofNullable(c.getDataInfo()) .ifPresent( provenance -> country .setProvenance( Provenance .newInstance( provenance .getProvenanceaction() .getClassname(), c.getDataInfo().getTrust()))); return country; }) .filter(Objects::nonNull) .collect(Collectors.toList()))); } private static void mapContributor(Result out, eu.dnetlib.dhp.schema.oaf.Result input) { final List contributorList = new ArrayList<>(); Optional .ofNullable(input.getContributor()) .ifPresent(value -> value.stream().forEach(c -> contributorList.add(c.getValue()))); out.setContributor(contributorList); } private static void mapAccessRight(Result out, eu.dnetlib.dhp.schema.oaf.Result input) { // I do not map Access Right UNKNOWN or OTHER Optional oar = Optional.ofNullable(input.getBestaccessright()); if (oar.isPresent() && Constants.ACCESS_RIGHTS_COAR_MAP.containsKey(oar.get().getClassid())) { String code = Constants.ACCESS_RIGHTS_COAR_MAP.get(oar.get().getClassid()); out .setBestaccessright( BestAccessRight .newInstance( code, Constants.COAR_CODE_LABEL_MAP.get(code), Constants.COAR_ACCESS_RIGHT_SCHEMA)); } } private static void mapAuthor(Result out, eu.dnetlib.dhp.schema.oaf.Result input) { Optional .ofNullable(input.getAuthor()) .ifPresent( ats -> out.setAuthor(ats.stream().map(ResultMapper::getAuthor).collect(Collectors.toList()))); } private static Map> createSubjectMap( eu.dnetlib.dhp.schema.oaf.Result input) { Map> map = new HashMap<>(); input.getSubject().stream().forEach(s -> { String key = s.getQualifier().getClassid().toLowerCase(); if (!key.equalsIgnoreCase("http://www.abs.gov.au/ausstats/abs@.nsf/0/6BB427AB9696C225CA2574180004463E") && !key.equalsIgnoreCase("keyword") && !key.equalsIgnoreCase("eosc")) { if (!map.containsKey(key)) { map.put(key, new ArrayList<>()); } eu.dnetlib.dhp.eosc.model.Subject subject = new eu.dnetlib.dhp.eosc.model.Subject(); subject.setValue(s.getValue()); Provenance p = getProvenance(s); if (p != null) { subject.setProvenance(p); } map.get(key).add(subject); } }); return map; } private static void addTypeSpecificInformation(Result out, eu.dnetlib.dhp.schema.oaf.Result input, eu.dnetlib.dhp.schema.oaf.Qualifier ort) throws NoAvailableEntityTypeException { switch (ort.getClassid()) { case "publication": Optional journal = Optional .ofNullable(((Publication) input).getJournal()); if (journal.isPresent()) { Journal j = journal.get(); Container c = new Container(); c.setConferencedate(j.getConferencedate()); c.setConferenceplace(j.getConferenceplace()); c.setEdition(j.getEdition()); c.setEp(j.getEp()); c.setIss(j.getIss()); c.setIssnLinking(j.getIssnLinking()); c.setIssnOnline(j.getIssnOnline()); c.setIssnPrinted(j.getIssnPrinted()); c.setName(j.getName()); c.setSp(j.getSp()); c.setVol(j.getVol()); out.setContainer(c); out.setType(ModelConstants.PUBLICATION_DEFAULT_RESULTTYPE.getClassname()); } if (Optional.ofNullable(((Publication) input).getFulltext()).isPresent()) mapFulltext(out, input); break; case "dataset": Dataset id = (Dataset) input; Optional.ofNullable(id.getSize()).ifPresent(v -> out.setSize(v.getValue())); Optional.ofNullable(id.getVersion()).ifPresent(v -> out.setVersion(v.getValue())); out .setGeolocation( Optional .ofNullable(id.getGeolocation()) .map( igl -> igl .stream() .filter(Objects::nonNull) .map(gli -> { GeoLocation gl = new GeoLocation(); gl.setBox(gli.getBox()); gl.setPlace(gli.getPlace()); gl.setPoint(gli.getPoint()); return gl; }) .collect(Collectors.toList())) .orElse(null)); out.setType(ModelConstants.DATASET_DEFAULT_RESULTTYPE.getClassname()); break; case "software": Software is = (Software) input; Optional .ofNullable(is.getCodeRepositoryUrl()) .ifPresent(value -> out.setCodeRepositoryUrl(value.getValue())); Optional .ofNullable(is.getDocumentationUrl()) .ifPresent( value -> out .setDocumentationUrl( value .stream() .map(Field::getValue) .collect(Collectors.toList()))); Optional .ofNullable(is.getProgrammingLanguage()) .ifPresent(value -> out.setProgrammingLanguage(value.getClassid())); out.setType(ModelConstants.SOFTWARE_DEFAULT_RESULTTYPE.getClassname()); break; case "other": OtherResearchProduct ir = (OtherResearchProduct) input; out .setContactgroup( Optional .ofNullable(ir.getContactgroup()) .map(value -> value.stream().map(Field::getValue).collect(Collectors.toList())) .orElse(null)); out .setContactperson( Optional .ofNullable(ir.getContactperson()) .map(value -> value.stream().map(Field::getValue).collect(Collectors.toList())) .orElse(null)); out .setTool( Optional .ofNullable(ir.getTool()) .map(value -> value.stream().map(Field::getValue).collect(Collectors.toList())) .orElse(null)); out.setType(ModelConstants.ORP_DEFAULT_RESULTTYPE.getClassname()); if (Optional.ofNullable(((OtherResearchProduct) input).getFulltext()).isPresent()) mapFulltext(out, input); break; default: throw new NoAvailableEntityTypeException(); } } private static eu.dnetlib.dhp.eosc.model.Instance getGraphInstance(eu.dnetlib.dhp.schema.oaf.Instance i) { eu.dnetlib.dhp.eosc.model.Instance instance = new eu.dnetlib.dhp.eosc.model.Instance(); setCommonValue(i, instance); return instance; } private static eu.dnetlib.dhp.eosc.model.Instance getCommunityInstance(eu.dnetlib.dhp.schema.oaf.Instance i, String resultType, List eoscIds) { eu.dnetlib.dhp.eosc.model.Instance instance = new eu.dnetlib.dhp.eosc.model.Instance(); setCommonValue(i, instance); instance .setHostedby( CfHbKeyValue.newInstance(i.getHostedby().getKey(), i.getHostedby().getValue())); List eoscDsIds = eoscIds .stream() .filter( dm -> dm .getGraphId() .equals(i.getHostedby().getKey()) || dm .getGraphId() .equals(i.getCollectedfrom().getKey())) .collect(Collectors.toList()); if (eoscDsIds.size() > 0) { instance .setEoscDsId( eoscDsIds .stream() .map(dm -> dm.getEoscId()) .collect(Collectors.toList())); } if (resultType.equals("publication") || resultType.equals("other")) { if (Optional.ofNullable(i.getFulltext()).isPresent()) instance.setFulltext(i.getFulltext()); } return instance; } private static void setCommonValue(eu.dnetlib.dhp.schema.oaf.Instance i, eu.dnetlib.dhp.eosc.model.Instance instance) { Optional opAr = Optional.ofNullable(i.getAccessright()); if (opAr.isPresent() && Constants.ACCESS_RIGHTS_COAR_MAP.containsKey(opAr.get().getClassid())) { String code = Constants.ACCESS_RIGHTS_COAR_MAP.get(opAr.get().getClassid()); instance .setAccessright( AccessRight .newInstance( code, Constants.COAR_CODE_LABEL_MAP.get(code), Constants.COAR_ACCESS_RIGHT_SCHEMA)); Optional> mes = Optional.ofNullable(i.getMeasures()); if (mes.isPresent()) { List measure = new ArrayList<>(); mes .get() .forEach( m -> m.getUnit().forEach(u -> measure.add(Measure.newInstance(m.getId(), u.getValue())))); instance.setMeasures(measure); } if (opAr.get().getOpenAccessRoute() != null) { switch (opAr.get().getOpenAccessRoute()) { case hybrid: instance.getAccessright().setOpenAccessRoute(OpenAccessRoute.hybrid); break; case gold: instance.getAccessright().setOpenAccessRoute(OpenAccessRoute.gold); break; case green: instance.getAccessright().setOpenAccessRoute(OpenAccessRoute.green); break; case bronze: instance.getAccessright().setOpenAccessRoute(OpenAccessRoute.bronze); break; } } } Optional .ofNullable(i.getPid()) .ifPresent( pid -> instance .setPid( pid .stream() .map(p -> ResultPid.newInstance(p.getQualifier().getClassid(), p.getValue())) .collect(Collectors.toList()))); Optional .ofNullable(i.getAlternateIdentifier()) .ifPresent( ai -> instance .setAlternateIdentifier( ai .stream() .map(p -> AlternateIdentifier.newInstance(p.getQualifier().getClassid(), p.getValue())) .collect(Collectors.toList()))); Optional .ofNullable(i.getLicense()) .ifPresent(value -> instance.setLicense(value.getValue())); Optional .ofNullable(i.getDateofacceptance()) .ifPresent(value -> instance.setPublicationdate(value.getValue())); Optional .ofNullable(i.getRefereed()) .ifPresent(value -> instance.setRefereed(value.getClassname())); Optional .ofNullable(i.getInstancetype()) .ifPresent(value -> instance.setType(value.getClassname())); Optional.ofNullable(i.getUrl()).ifPresent(value -> instance.setUrl(value)); Optional> oPca = Optional.ofNullable(i.getProcessingchargeamount()); Optional> oPcc = Optional.ofNullable(i.getProcessingchargecurrency()); if (oPca.isPresent() && oPcc.isPresent()) { Field pca = oPca.get(); Field pcc = oPcc.get(); if (!pca.getValue().trim().equals("") && !pcc.getValue().trim().equals("")) { APC apc = new APC(); apc.setCurrency(oPcc.get().getValue()); apc.setAmount(oPca.get().getValue()); instance.setArticleprocessingcharge(apc); } } Optional.ofNullable(i.getUrl()).ifPresent(instance::setUrl); } private static List getUniqueProvenance(List provenance) throws NoAvailableEntityTypeException { Provenance iProv = new Provenance(); Provenance hProv = new Provenance(); Provenance lProv = new Provenance(); for (Provenance p : provenance) { switch (p.getProvenance()) { case Constants.HARVESTED: hProv = getHighestTrust(hProv, p); break; case Constants.INFERRED: iProv = getHighestTrust(iProv, p); // To be removed as soon as the new beta run has been done // this fixex issue of not set trust during bulktagging if (StringUtils.isEmpty(iProv.getTrust())) { iProv.setTrust(Constants.DEFAULT_TRUST); } break; case Constants.USER_CLAIM: lProv = getHighestTrust(lProv, p); break; default: throw new NoAvailableEntityTypeException(); } } return Arrays .asList(iProv, hProv, lProv) .stream() .filter(p -> !StringUtils.isEmpty(p.getProvenance())) .collect(Collectors.toList()); } private static Provenance getHighestTrust(Provenance hProv, Provenance p) { if (StringUtils.isNoneEmpty(hProv.getTrust(), p.getTrust())) return hProv.getTrust().compareTo(p.getTrust()) > 0 ? hProv : p; return (StringUtils.isEmpty(p.getTrust()) && !StringUtils.isEmpty(hProv.getTrust())) ? hProv : p; } private static Provenance getProvenance(StructuredProperty s) { Optional di = Optional.ofNullable(s.getDataInfo()); if (di.isPresent()) { Provenance p = new Provenance(); p.setProvenance(di.get().getProvenanceaction().getClassname()); p.setTrust(di.get().getTrust()); return p; } return null; } private static Author getAuthor(eu.dnetlib.dhp.schema.oaf.Author oa) { Author a = new Author(); a.setFullname(oa.getFullname()); a.setName(oa.getName()); a.setSurname(oa.getSurname()); a.setRank(oa.getRank()); Optional> oPids = Optional .ofNullable(oa.getPid()); if (oPids.isPresent()) { AuthorPid pid = getOrcid(oPids.get()); if (pid != null) { a.setPid(pid); } } return a; } private static AuthorPid getAuthorPid(StructuredProperty pid) { Optional di = Optional.ofNullable(pid.getDataInfo()); if (di.isPresent()) { return AuthorPid .newInstance( AuthorPidSchemeValue .newInstance( pid.getQualifier().getClassid(), pid.getValue()), Provenance .newInstance( di.get().getProvenanceaction().getClassname(), di.get().getTrust())); } else { return AuthorPid .newInstance( AuthorPidSchemeValue .newInstance( pid.getQualifier().getClassid(), pid.getValue()), null ); } } private static AuthorPid getOrcid(List p) { List pidList = p.stream().map(pid -> { if (pid.getQualifier().getClassid().equals(ModelConstants.ORCID) || (pid.getQualifier().getClassid().equals(ModelConstants.ORCID_PENDING))) { return pid; } return null; }).filter(Objects::nonNull).collect(Collectors.toList()); if (pidList.size() == 1) { return getAuthorPid(pidList.get(0)); } List orcid = pidList .stream() .filter( ap -> ap .getQualifier() .getClassid() .equals(ModelConstants.ORCID)) .collect(Collectors.toList()); if (orcid.size() == 1) { return getAuthorPid(orcid.get(0)); } orcid = pidList .stream() .filter( ap -> ap .getQualifier() .getClassid() .equals(ModelConstants.ORCID_PENDING)) .collect(Collectors.toList()); if (orcid.size() == 1) { return getAuthorPid(orcid.get(0)); } return null; } }