Merge pull request 'AdvancedConstraint' (#285) from advConstraintsInBeta into master

Reviewed-on: D-Net/dnet-hadoop#285
This commit is contained in:
Claudio Atzori 2023-04-06 09:24:54 +02:00
commit e093f04874
6 changed files with 156 additions and 62 deletions

View File

@ -45,16 +45,24 @@ public class MakeTarArchive implements Serializable {
.map(Integer::valueOf)
.orElse(10);
final boolean rename = Optional
.ofNullable(parser.get("rename"))
.map(Boolean::valueOf)
.orElse(Boolean.FALSE);
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode);
FileSystem fileSystem = FileSystem.get(conf);
makeTArArchive(fileSystem, inputPath, outputPath, gBperSplit);
makeTArArchive(fileSystem, inputPath, outputPath, gBperSplit, rename);
}
public static void makeTArArchive(FileSystem fileSystem, String inputPath, String outputPath, int gBperSplit)
public static void makeTArArchive(FileSystem fileSystem, String inputPath, String outputPath, int gBperSplit) throws IOException{
makeTArArchive(fileSystem,inputPath,outputPath,gBperSplit,false);
}
public static void makeTArArchive(FileSystem fileSystem, String inputPath, String outputPath, int gBperSplit,
boolean rename)
throws IOException {
RemoteIterator<LocatedFileStatus> dirIterator = fileSystem.listLocatedStatus(new Path(inputPath));
@ -66,7 +74,7 @@ public class MakeTarArchive implements Serializable {
String pathString = p.toString();
String entity = pathString.substring(pathString.lastIndexOf("/") + 1);
MakeTarArchive.tarMaxSize(fileSystem, pathString, outputPath + "/" + entity, entity, gBperSplit);
MakeTarArchive.tarMaxSize(fileSystem, pathString, outputPath + "/" + entity, entity, gBperSplit, rename);
}
}
@ -79,7 +87,8 @@ public class MakeTarArchive implements Serializable {
return new TarArchiveOutputStream(fileSystem.create(hdfsWritePath).getWrappedStream());
}
private static void write(FileSystem fileSystem, String inputPath, String outputPath, String dirName)
private static void write(FileSystem fileSystem, String inputPath, String outputPath, String dirName,
boolean rename)
throws IOException {
Path hdfsWritePath = new Path(outputPath);
@ -95,20 +104,20 @@ public class MakeTarArchive implements Serializable {
new Path(inputPath), true);
while (iterator.hasNext()) {
writeCurrentFile(fileSystem, dirName, iterator, ar, 0);
writeCurrentFile(fileSystem, dirName, iterator, ar, 0, rename);
}
}
}
public static void tarMaxSize(FileSystem fileSystem, String inputPath, String outputPath, String dir_name,
int gBperSplit) throws IOException {
int gBperSplit, boolean rename) throws IOException {
final long bytesPerSplit = 1024L * 1024L * 1024L * gBperSplit;
long sourceSize = fileSystem.getContentSummary(new Path(inputPath)).getSpaceConsumed();
if (sourceSize < bytesPerSplit) {
write(fileSystem, inputPath, outputPath + ".tar", dir_name);
write(fileSystem, inputPath, outputPath + ".tar", dir_name, rename);
} else {
int partNum = 0;
@ -121,7 +130,8 @@ public class MakeTarArchive implements Serializable {
long currentSize = 0;
while (next && currentSize < bytesPerSplit) {
currentSize = writeCurrentFile(fileSystem, dir_name, fileStatusListIterator, ar, currentSize);
currentSize = writeCurrentFile(
fileSystem, dir_name, fileStatusListIterator, ar, currentSize, rename);
next = fileStatusListIterator.hasNext();
}
@ -134,7 +144,7 @@ public class MakeTarArchive implements Serializable {
private static long writeCurrentFile(FileSystem fileSystem, String dirName,
RemoteIterator<LocatedFileStatus> fileStatusListIterator,
TarArchiveOutputStream ar, long currentSize) throws IOException {
TarArchiveOutputStream ar, long currentSize, boolean rename) throws IOException {
LocatedFileStatus fileStatus = fileStatusListIterator.next();
Path p = fileStatus.getPath();
@ -148,6 +158,11 @@ public class MakeTarArchive implements Serializable {
}
name = tmp;
}
if (rename) {
if (name.endsWith(".txt.gz"))
name = name.replace(".txt.gz", ".json.gz");
}
TarArchiveEntry entry = new TarArchiveEntry(dirName + "/" + name);
entry.setSize(fileStatus.getLen());
currentSize += fileStatus.getLen();

View File

@ -23,6 +23,12 @@
"paramLongName":"splitSize",
"paramDescription": "the maximum size of the archive",
"paramRequired": false
},
{
"paramName":"rn",
"paramLongName":"rename",
"paramDescription": "if the file has to be renamed",
"paramRequired": false
}
]

View File

@ -14,6 +14,7 @@ public class Community implements Serializable {
private List<String> subjects = new ArrayList<>();
private List<Provider> providers = new ArrayList<>();
private List<ZenodoCommunity> zenodoCommunities = new ArrayList<>();
private SelectionConstraints constraints = new SelectionConstraints();
public String toJson() {
final Gson g = new Gson();
@ -57,4 +58,12 @@ public class Community implements Serializable {
public void setZenodoCommunities(List<ZenodoCommunity> zenodoCommunities) {
this.zenodoCommunities = zenodoCommunities;
}
public SelectionConstraints getConstraints() {
return constraints;
}
public void setConstraints(SelectionConstraints constraints) {
this.constraints = constraints;
}
}

View File

@ -24,6 +24,8 @@ public class CommunityConfiguration implements Serializable {
private Map<String, List<Pair<String, SelectionConstraints>>> datasourceMap = new HashMap<>();
// map zenodocommunityid -> communityid
private Map<String, List<Pair<String, SelectionConstraints>>> zenodocommunityMap = new HashMap<>();
// map communityid -> selectionconstraints
private Map<String, SelectionConstraints> selectionConstraintsMap = new HashMap<>();
public Map<String, List<Pair<String, SelectionConstraints>>> getSubjectMap() {
return subjectMap;
@ -51,6 +53,14 @@ public class CommunityConfiguration implements Serializable {
this.zenodocommunityMap = zenodocommunityMap;
}
public Map<String, SelectionConstraints> getSelectionConstraintsMap() {
return selectionConstraintsMap;
}
public void setSelectionConstraintsMap(Map<String, SelectionConstraints> selectionConstraintsMap) {
this.selectionConstraintsMap = selectionConstraintsMap;
}
CommunityConfiguration(final Map<String, Community> communities) {
this.communities = communities;
init();
@ -67,6 +77,9 @@ public class CommunityConfiguration implements Serializable {
if (zenodocommunityMap == null) {
zenodocommunityMap = Maps.newHashMap();
}
if (selectionConstraintsMap == null) {
selectionConstraintsMap = Maps.newHashMap();
}
for (Community c : getCommunities().values()) {
// get subjects
@ -87,6 +100,7 @@ public class CommunityConfiguration implements Serializable {
new Pair<>(id, zc.getSelCriteria()),
zenodocommunityMap);
}
selectionConstraintsMap.put(id, c.getConstraints());
}
}

View File

@ -15,7 +15,11 @@ import com.google.gson.Gson;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import eu.dnetlib.dhp.bulktag.SparkBulkTagJob;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
/** Created by miriam on 02/08/2018. */
public class ResultTagger implements Serializable {
@ -128,6 +132,25 @@ public class ResultTagger implements Serializable {
communities.addAll(czenodo);
/* Tagging for Advanced Constraints */
final Set<String> aconstraints = new HashSet<>();
conf
.getSelectionConstraintsMap()
.keySet()
.forEach(communityId -> {
if (conf.getSelectionConstraintsMap().get(communityId) != null &&
conf
.getSelectionConstraintsMap()
.get(communityId)
.getCriteria()
.stream()
.anyMatch(crit -> crit.verifyCriteria(param)))
aconstraints.add(communityId);
});
communities.addAll(aconstraints);
clearContext(result);
/* Verify if there is something to bulktag */
@ -136,7 +159,8 @@ public class ResultTagger implements Serializable {
}
result.getContext().forEach(c -> {
if (communities.contains(c.getId())) {
final String cId = c.getId();
if (communities.contains(cId)) {
Optional<List<DataInfo>> opt_dataInfoList = Optional.ofNullable(c.getDataInfo());
List<DataInfo> dataInfoList;
if (opt_dataInfoList.isPresent())
@ -145,30 +169,51 @@ public class ResultTagger implements Serializable {
dataInfoList = new ArrayList<>();
c.setDataInfo(dataInfoList);
}
if (subjects.contains(c.getId()))
if (subjects.contains(cId))
dataInfoList
.add(
getDataInfo(
BULKTAG_DATA_INFO_TYPE,
CLASS_ID_SUBJECT,
CLASS_NAME_BULKTAG_SUBJECT,
OafMapperUtils
.dataInfo(
false, BULKTAG_DATA_INFO_TYPE, true, false,
OafMapperUtils
.qualifier(
CLASS_ID_SUBJECT, CLASS_NAME_BULKTAG_SUBJECT, DNET_PROVENANCE_ACTIONS,
DNET_PROVENANCE_ACTIONS),
TAGGING_TRUST));
if (datasources.contains(c.getId()))
if (datasources.contains(cId))
dataInfoList
.add(
getDataInfo(
BULKTAG_DATA_INFO_TYPE,
CLASS_ID_DATASOURCE,
CLASS_NAME_BULKTAG_DATASOURCE,
OafMapperUtils
.dataInfo(
false, BULKTAG_DATA_INFO_TYPE, true, false,
OafMapperUtils
.qualifier(
CLASS_ID_DATASOURCE, CLASS_NAME_BULKTAG_DATASOURCE, DNET_PROVENANCE_ACTIONS,
DNET_PROVENANCE_ACTIONS),
TAGGING_TRUST));
if (czenodo.contains(c.getId()))
if (czenodo.contains(cId))
dataInfoList
.add(
getDataInfo(
BULKTAG_DATA_INFO_TYPE,
CLASS_ID_CZENODO,
CLASS_NAME_BULKTAG_ZENODO,
OafMapperUtils
.dataInfo(
false, BULKTAG_DATA_INFO_TYPE, true, false,
OafMapperUtils
.qualifier(
CLASS_ID_CZENODO, CLASS_NAME_BULKTAG_ZENODO, DNET_PROVENANCE_ACTIONS,
DNET_PROVENANCE_ACTIONS),
TAGGING_TRUST));
if (aconstraints.contains(cId))
dataInfoList
.add(
OafMapperUtils
.dataInfo(
false, BULKTAG_DATA_INFO_TYPE, true, false,
OafMapperUtils
.qualifier(
CLASS_ID_ADVANCED_CONSTRAINT, CLASS_NAME_BULKTAG_ADVANCED_CONSTRAINT,
DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS),
TAGGING_TRUST));
}
});
@ -189,27 +234,48 @@ public class ResultTagger implements Serializable {
if (subjects.contains(c))
dataInfoList
.add(
getDataInfo(
BULKTAG_DATA_INFO_TYPE,
CLASS_ID_SUBJECT,
CLASS_NAME_BULKTAG_SUBJECT,
OafMapperUtils
.dataInfo(
false, BULKTAG_DATA_INFO_TYPE, true, false,
OafMapperUtils
.qualifier(
CLASS_ID_SUBJECT, CLASS_NAME_BULKTAG_SUBJECT, DNET_PROVENANCE_ACTIONS,
DNET_PROVENANCE_ACTIONS),
TAGGING_TRUST));
if (datasources.contains(c))
dataInfoList
.add(
getDataInfo(
BULKTAG_DATA_INFO_TYPE,
CLASS_ID_DATASOURCE,
CLASS_NAME_BULKTAG_DATASOURCE,
OafMapperUtils
.dataInfo(
false, BULKTAG_DATA_INFO_TYPE, true, false,
OafMapperUtils
.qualifier(
CLASS_ID_DATASOURCE, CLASS_NAME_BULKTAG_DATASOURCE,
DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS),
TAGGING_TRUST));
if (czenodo.contains(c))
dataInfoList
.add(
getDataInfo(
BULKTAG_DATA_INFO_TYPE,
CLASS_ID_CZENODO,
CLASS_NAME_BULKTAG_ZENODO,
OafMapperUtils
.dataInfo(
false, BULKTAG_DATA_INFO_TYPE, true, false,
OafMapperUtils
.qualifier(
CLASS_ID_CZENODO, CLASS_NAME_BULKTAG_ZENODO, DNET_PROVENANCE_ACTIONS,
DNET_PROVENANCE_ACTIONS),
TAGGING_TRUST));
if (aconstraints.contains(c))
dataInfoList
.add(
OafMapperUtils
.dataInfo(
false, BULKTAG_DATA_INFO_TYPE, true, false,
OafMapperUtils
.qualifier(
CLASS_ID_ADVANCED_CONSTRAINT, CLASS_NAME_BULKTAG_ADVANCED_CONSTRAINT,
DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS),
TAGGING_TRUST));
context.setDataInfo(dataInfoList);
return context;
})
@ -219,22 +285,4 @@ public class ResultTagger implements Serializable {
return result;
}
public static DataInfo getDataInfo(
String inference_provenance, String inference_class_id, String inference_class_name, String trust) {
DataInfo di = new DataInfo();
di.setInferred(true);
di.setInferenceprovenance(inference_provenance);
di.setProvenanceaction(getQualifier(inference_class_id, inference_class_name));
di.setTrust(trust);
return di;
}
public static Qualifier getQualifier(String inference_class_id, String inference_class_name) {
Qualifier pa = new Qualifier();
pa.setClassid(inference_class_id);
pa.setClassname(inference_class_name);
pa.setSchemeid(DNET_PROVENANCE_ACTIONS);
pa.setSchemename(DNET_PROVENANCE_ACTIONS);
return pa;
}
}

View File

@ -11,12 +11,14 @@ public class TaggingConstants {
public static final String CLASS_ID_SUBJECT = "community:subject";
public static final String CLASS_ID_DATASOURCE = "community:datasource";
public static final String CLASS_ID_CZENODO = "community:zenodocommunity";
public static final String CLASS_ID_ADVANCED_CONSTRAINT = "community:advconstraint";
public static final String ZENODO_COMMUNITY_INDICATOR = "zenodo.org/communities/";
public static final String CLASS_NAME_BULKTAG_SUBJECT = "Bulktagging for Community - Subject";
public static final String CLASS_NAME_BULKTAG_DATASOURCE = "Bulktagging for Community - Datasource";
public static final String CLASS_NAME_BULKTAG_ZENODO = "Bulktagging for Community - Zenodo";
public static final String CLASS_NAME_BULKTAG_ADVANCED_CONSTRAINT = "Bulktagging for Community - Advanced Constraints";
public static final String TAGGING_TRUST = "0.8";
}