From 9a9cc6a1ddcc929b413ef44040ec5296c604548f Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 4 Apr 2023 11:40:58 +0200 Subject: [PATCH 1/2] changed the way the tar archive is build to support renaming in case we need to change .tt.gz into .json.gz --- .../eu/dnetlib/dhp/common/MakeTarArchive.java | 35 +++++++++++++------ .../dhp/common/input_maketar_parameters.json | 6 ++++ 2 files changed, 31 insertions(+), 10 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java index eca433e9e..e1c79091b 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java @@ -45,16 +45,24 @@ public class MakeTarArchive implements Serializable { .map(Integer::valueOf) .orElse(10); + final boolean rename = Optional + .ofNullable(parser.get("rename")) + .map(Boolean::valueOf) + .orElse(Boolean.FALSE); + Configuration conf = new Configuration(); conf.set("fs.defaultFS", hdfsNameNode); FileSystem fileSystem = FileSystem.get(conf); - makeTArArchive(fileSystem, inputPath, outputPath, gBperSplit); + makeTArArchive(fileSystem, inputPath, outputPath, gBperSplit, rename); } - - public static void makeTArArchive(FileSystem fileSystem, String inputPath, String outputPath, int gBperSplit) + public static void makeTArArchive(FileSystem fileSystem, String inputPath, String outputPath, int gBperSplit) throws IOException{ + makeTArArchive(fileSystem,inputPath,outputPath,gBperSplit,false); + } + public static void makeTArArchive(FileSystem fileSystem, String inputPath, String outputPath, int gBperSplit, + boolean rename) throws IOException { RemoteIterator dirIterator = fileSystem.listLocatedStatus(new Path(inputPath)); @@ -66,7 +74,7 @@ public class MakeTarArchive implements Serializable { String pathString = p.toString(); String entity = pathString.substring(pathString.lastIndexOf("/") + 1); - MakeTarArchive.tarMaxSize(fileSystem, pathString, outputPath + "/" + entity, entity, gBperSplit); + MakeTarArchive.tarMaxSize(fileSystem, pathString, outputPath + "/" + entity, entity, gBperSplit, rename); } } @@ -79,7 +87,8 @@ public class MakeTarArchive implements Serializable { return new TarArchiveOutputStream(fileSystem.create(hdfsWritePath).getWrappedStream()); } - private static void write(FileSystem fileSystem, String inputPath, String outputPath, String dirName) + private static void write(FileSystem fileSystem, String inputPath, String outputPath, String dirName, + boolean rename) throws IOException { Path hdfsWritePath = new Path(outputPath); @@ -95,20 +104,20 @@ public class MakeTarArchive implements Serializable { new Path(inputPath), true); while (iterator.hasNext()) { - writeCurrentFile(fileSystem, dirName, iterator, ar, 0); + writeCurrentFile(fileSystem, dirName, iterator, ar, 0, rename); } } } public static void tarMaxSize(FileSystem fileSystem, String inputPath, String outputPath, String dir_name, - int gBperSplit) throws IOException { + int gBperSplit, boolean rename) throws IOException { final long bytesPerSplit = 1024L * 1024L * 1024L * gBperSplit; long sourceSize = fileSystem.getContentSummary(new Path(inputPath)).getSpaceConsumed(); if (sourceSize < bytesPerSplit) { - write(fileSystem, inputPath, outputPath + ".tar", dir_name); + write(fileSystem, inputPath, outputPath + ".tar", dir_name, rename); } else { int partNum = 0; @@ -121,7 +130,8 @@ public class MakeTarArchive implements Serializable { long currentSize = 0; while (next && currentSize < bytesPerSplit) { - currentSize = writeCurrentFile(fileSystem, dir_name, fileStatusListIterator, ar, currentSize); + currentSize = writeCurrentFile( + fileSystem, dir_name, fileStatusListIterator, ar, currentSize, rename); next = fileStatusListIterator.hasNext(); } @@ -134,7 +144,7 @@ public class MakeTarArchive implements Serializable { private static long writeCurrentFile(FileSystem fileSystem, String dirName, RemoteIterator fileStatusListIterator, - TarArchiveOutputStream ar, long currentSize) throws IOException { + TarArchiveOutputStream ar, long currentSize, boolean rename) throws IOException { LocatedFileStatus fileStatus = fileStatusListIterator.next(); Path p = fileStatus.getPath(); @@ -148,6 +158,11 @@ public class MakeTarArchive implements Serializable { } name = tmp; } + if (rename) { + if (name.endsWith(".txt.gz")) + name = name.replace(".txt.gz", ".json.gz"); + } + TarArchiveEntry entry = new TarArchiveEntry(dirName + "/" + name); entry.setSize(fileStatus.getLen()); currentSize += fileStatus.getLen(); diff --git a/dhp-common/src/main/resources/eu/dnetlib/dhp/common/input_maketar_parameters.json b/dhp-common/src/main/resources/eu/dnetlib/dhp/common/input_maketar_parameters.json index a15318865..c57f67ebd 100644 --- a/dhp-common/src/main/resources/eu/dnetlib/dhp/common/input_maketar_parameters.json +++ b/dhp-common/src/main/resources/eu/dnetlib/dhp/common/input_maketar_parameters.json @@ -23,6 +23,12 @@ "paramLongName":"splitSize", "paramDescription": "the maximum size of the archive", "paramRequired": false + }, + { + "paramName":"rn", + "paramLongName":"rename", + "paramDescription": "if the file has to be renamed", + "paramRequired": false } ] From ecc05fe0f35cbd4326c0bb9ae3bd677290ae196c Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 5 Apr 2023 16:40:29 +0200 Subject: [PATCH 2/2] Added the code for the advancedConstraint implementation during the bulkTagging --- .../dhp/bulktag/community/Community.java | 9 ++ .../community/CommunityConfiguration.java | 14 ++ .../dhp/bulktag/community/ResultTagger.java | 152 ++++++++++++------ .../bulktag/community/TaggingConstants.java | 2 + 4 files changed, 125 insertions(+), 52 deletions(-) diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/Community.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/Community.java index 8e76b5778..208dc7dd2 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/Community.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/Community.java @@ -14,6 +14,7 @@ public class Community implements Serializable { private List subjects = new ArrayList<>(); private List providers = new ArrayList<>(); private List zenodoCommunities = new ArrayList<>(); + private SelectionConstraints constraints = new SelectionConstraints(); public String toJson() { final Gson g = new Gson(); @@ -57,4 +58,12 @@ public class Community implements Serializable { public void setZenodoCommunities(List zenodoCommunities) { this.zenodoCommunities = zenodoCommunities; } + + public SelectionConstraints getConstraints() { + return constraints; + } + + public void setConstraints(SelectionConstraints constraints) { + this.constraints = constraints; + } } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/CommunityConfiguration.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/CommunityConfiguration.java index 5d92f5ab6..66407a4b3 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/CommunityConfiguration.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/CommunityConfiguration.java @@ -24,6 +24,8 @@ public class CommunityConfiguration implements Serializable { private Map>> datasourceMap = new HashMap<>(); // map zenodocommunityid -> communityid private Map>> zenodocommunityMap = new HashMap<>(); + // map communityid -> selectionconstraints + private Map selectionConstraintsMap = new HashMap<>(); public Map>> getSubjectMap() { return subjectMap; @@ -51,6 +53,14 @@ public class CommunityConfiguration implements Serializable { this.zenodocommunityMap = zenodocommunityMap; } + public Map getSelectionConstraintsMap() { + return selectionConstraintsMap; + } + + public void setSelectionConstraintsMap(Map selectionConstraintsMap) { + this.selectionConstraintsMap = selectionConstraintsMap; + } + CommunityConfiguration(final Map communities) { this.communities = communities; init(); @@ -67,6 +77,9 @@ public class CommunityConfiguration implements Serializable { if (zenodocommunityMap == null) { zenodocommunityMap = Maps.newHashMap(); } + if (selectionConstraintsMap == null) { + selectionConstraintsMap = Maps.newHashMap(); + } for (Community c : getCommunities().values()) { // get subjects @@ -87,6 +100,7 @@ public class CommunityConfiguration implements Serializable { new Pair<>(id, zc.getSelCriteria()), zenodocommunityMap); } + selectionConstraintsMap.put(id, c.getConstraints()); } } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java index 132006f5c..4a764acd7 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java @@ -15,7 +15,11 @@ import com.google.gson.Gson; import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.JsonPath; +import eu.dnetlib.dhp.bulktag.SparkBulkTagJob; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; /** Created by miriam on 02/08/2018. */ public class ResultTagger implements Serializable { @@ -128,6 +132,25 @@ public class ResultTagger implements Serializable { communities.addAll(czenodo); + /* Tagging for Advanced Constraints */ + final Set aconstraints = new HashSet<>(); + + conf + .getSelectionConstraintsMap() + .keySet() + .forEach(communityId -> { + if (conf.getSelectionConstraintsMap().get(communityId) != null && + conf + .getSelectionConstraintsMap() + .get(communityId) + .getCriteria() + .stream() + .anyMatch(crit -> crit.verifyCriteria(param))) + aconstraints.add(communityId); + }); + + communities.addAll(aconstraints); + clearContext(result); /* Verify if there is something to bulktag */ @@ -136,7 +159,8 @@ public class ResultTagger implements Serializable { } result.getContext().forEach(c -> { - if (communities.contains(c.getId())) { + final String cId = c.getId(); + if (communities.contains(cId)) { Optional> opt_dataInfoList = Optional.ofNullable(c.getDataInfo()); List dataInfoList; if (opt_dataInfoList.isPresent()) @@ -145,30 +169,51 @@ public class ResultTagger implements Serializable { dataInfoList = new ArrayList<>(); c.setDataInfo(dataInfoList); } - if (subjects.contains(c.getId())) + if (subjects.contains(cId)) dataInfoList .add( - getDataInfo( - BULKTAG_DATA_INFO_TYPE, - CLASS_ID_SUBJECT, - CLASS_NAME_BULKTAG_SUBJECT, - TAGGING_TRUST)); - if (datasources.contains(c.getId())) + OafMapperUtils + .dataInfo( + false, BULKTAG_DATA_INFO_TYPE, true, false, + OafMapperUtils + .qualifier( + CLASS_ID_SUBJECT, CLASS_NAME_BULKTAG_SUBJECT, DNET_PROVENANCE_ACTIONS, + DNET_PROVENANCE_ACTIONS), + TAGGING_TRUST)); + if (datasources.contains(cId)) dataInfoList .add( - getDataInfo( - BULKTAG_DATA_INFO_TYPE, - CLASS_ID_DATASOURCE, - CLASS_NAME_BULKTAG_DATASOURCE, - TAGGING_TRUST)); - if (czenodo.contains(c.getId())) + OafMapperUtils + .dataInfo( + false, BULKTAG_DATA_INFO_TYPE, true, false, + OafMapperUtils + .qualifier( + CLASS_ID_DATASOURCE, CLASS_NAME_BULKTAG_DATASOURCE, DNET_PROVENANCE_ACTIONS, + DNET_PROVENANCE_ACTIONS), + TAGGING_TRUST)); + if (czenodo.contains(cId)) dataInfoList .add( - getDataInfo( - BULKTAG_DATA_INFO_TYPE, - CLASS_ID_CZENODO, - CLASS_NAME_BULKTAG_ZENODO, - TAGGING_TRUST)); + OafMapperUtils + .dataInfo( + false, BULKTAG_DATA_INFO_TYPE, true, false, + OafMapperUtils + .qualifier( + CLASS_ID_CZENODO, CLASS_NAME_BULKTAG_ZENODO, DNET_PROVENANCE_ACTIONS, + DNET_PROVENANCE_ACTIONS), + TAGGING_TRUST)); + if (aconstraints.contains(cId)) + dataInfoList + .add( + OafMapperUtils + .dataInfo( + false, BULKTAG_DATA_INFO_TYPE, true, false, + OafMapperUtils + .qualifier( + CLASS_ID_ADVANCED_CONSTRAINT, CLASS_NAME_BULKTAG_ADVANCED_CONSTRAINT, + DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS), + TAGGING_TRUST)); + } }); @@ -189,27 +234,48 @@ public class ResultTagger implements Serializable { if (subjects.contains(c)) dataInfoList .add( - getDataInfo( - BULKTAG_DATA_INFO_TYPE, - CLASS_ID_SUBJECT, - CLASS_NAME_BULKTAG_SUBJECT, - TAGGING_TRUST)); + OafMapperUtils + .dataInfo( + false, BULKTAG_DATA_INFO_TYPE, true, false, + OafMapperUtils + .qualifier( + CLASS_ID_SUBJECT, CLASS_NAME_BULKTAG_SUBJECT, DNET_PROVENANCE_ACTIONS, + DNET_PROVENANCE_ACTIONS), + TAGGING_TRUST)); if (datasources.contains(c)) dataInfoList .add( - getDataInfo( - BULKTAG_DATA_INFO_TYPE, - CLASS_ID_DATASOURCE, - CLASS_NAME_BULKTAG_DATASOURCE, - TAGGING_TRUST)); + OafMapperUtils + .dataInfo( + false, BULKTAG_DATA_INFO_TYPE, true, false, + OafMapperUtils + .qualifier( + CLASS_ID_DATASOURCE, CLASS_NAME_BULKTAG_DATASOURCE, + DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS), + TAGGING_TRUST)); if (czenodo.contains(c)) dataInfoList .add( - getDataInfo( - BULKTAG_DATA_INFO_TYPE, - CLASS_ID_CZENODO, - CLASS_NAME_BULKTAG_ZENODO, - TAGGING_TRUST)); + OafMapperUtils + .dataInfo( + false, BULKTAG_DATA_INFO_TYPE, true, false, + OafMapperUtils + .qualifier( + CLASS_ID_CZENODO, CLASS_NAME_BULKTAG_ZENODO, DNET_PROVENANCE_ACTIONS, + DNET_PROVENANCE_ACTIONS), + TAGGING_TRUST)); + if (aconstraints.contains(c)) + dataInfoList + .add( + OafMapperUtils + .dataInfo( + false, BULKTAG_DATA_INFO_TYPE, true, false, + OafMapperUtils + .qualifier( + CLASS_ID_ADVANCED_CONSTRAINT, CLASS_NAME_BULKTAG_ADVANCED_CONSTRAINT, + DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS), + TAGGING_TRUST)); + context.setDataInfo(dataInfoList); return context; }) @@ -219,22 +285,4 @@ public class ResultTagger implements Serializable { return result; } - public static DataInfo getDataInfo( - String inference_provenance, String inference_class_id, String inference_class_name, String trust) { - DataInfo di = new DataInfo(); - di.setInferred(true); - di.setInferenceprovenance(inference_provenance); - di.setProvenanceaction(getQualifier(inference_class_id, inference_class_name)); - di.setTrust(trust); - return di; - } - - public static Qualifier getQualifier(String inference_class_id, String inference_class_name) { - Qualifier pa = new Qualifier(); - pa.setClassid(inference_class_id); - pa.setClassname(inference_class_name); - pa.setSchemeid(DNET_PROVENANCE_ACTIONS); - pa.setSchemename(DNET_PROVENANCE_ACTIONS); - return pa; - } } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/TaggingConstants.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/TaggingConstants.java index 8274e26c9..aea21f8e5 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/TaggingConstants.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/TaggingConstants.java @@ -11,12 +11,14 @@ public class TaggingConstants { public static final String CLASS_ID_SUBJECT = "community:subject"; public static final String CLASS_ID_DATASOURCE = "community:datasource"; public static final String CLASS_ID_CZENODO = "community:zenodocommunity"; + public static final String CLASS_ID_ADVANCED_CONSTRAINT = "community:advconstraint"; public static final String ZENODO_COMMUNITY_INDICATOR = "zenodo.org/communities/"; public static final String CLASS_NAME_BULKTAG_SUBJECT = "Bulktagging for Community - Subject"; public static final String CLASS_NAME_BULKTAG_DATASOURCE = "Bulktagging for Community - Datasource"; public static final String CLASS_NAME_BULKTAG_ZENODO = "Bulktagging for Community - Zenodo"; + public static final String CLASS_NAME_BULKTAG_ADVANCED_CONSTRAINT = "Bulktagging for Community - Advanced Constraints"; public static final String TAGGING_TRUST = "0.8"; }