dnet-applications/apps/dhp-mdstore-manager/src/main/resources/zeppelin/analyzeYears.py

29 lines
756 B
Python

%pyspark
from pyspark.sql.functions import udf
from pyspark.sql.functions import *
from lxml import etree
from datetime import datetime
@udf("string")
def get_year(record):
root = etree.fromstring(record.encode('utf-8'))
r = root.xpath("//*[local-name()='date']")
c_date = None
for item in r:
if c_date is None and item is not None:
c_date = item.text
else:
if item is not None and len(item.text) > len(c_date):
c_date = item.text
if c_date is not None:
return c_date[:4]
df = spark.read.load(path)
result_per_year = df.select(df.id, get_year(df.body).alias('year')).groupBy('year').agg(count(df.id).alias('cnt')).collect()
print "%table"
print "year\tcount"
for item in result_per_year:
print "{}\t{}".format(item.year, item.cnt)