import atexit
from app.metrics.pennsieve import get_download_count
from app.metrics.contentful import init_cf_cda_client, get_funded_projects_count, get_featured_datasets
from scripts.update_contentful_entries import update_all_events_sort_order, update_event_sort_order
from app.metrics.algolia import get_dataset_count, init_algolia_client, get_all_dataset_ids, get_all_dataset_uuids, get_associated_datasets
from app.metrics.ga import init_ga_reporting, get_ga_1year_sessions, init_gspread_client, append_contact, upload_file, init_drive_client
from scripts.update_featured_dataset_id import set_featured_dataset_id, get_featured_dataset_id_table_state
from scripts.update_protocol_metrics import update_protocol_metrics, get_protocol_metrics_table_state
from app.osparc.services import OSparcServices
import botocore
import markdown
import boto3
import hashlib
import hmac
import base64
import time
import hubspot
from hubspot.crm.contacts import ApiException
import json
import logging
import re
import requests
import uuid
from urllib.parse import urlparse
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.combining import OrTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.interval import IntervalTrigger
from botocore.exceptions import ClientError
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from flask import Flask, abort, jsonify, request
from flask_cors import CORS
from flask_marshmallow import Marshmallow
from pennsieve import Pennsieve
from pennsieve2.direct import new_client
from pennsieve.base import UnauthorizedException as PSUnauthorizedException
from PIL import Image
from requests.auth import HTTPBasicAuth
from flask_caching import Cache
from app.scicrunch_requests import create_doi_query, create_filter_request, create_facet_query, create_doi_aggregate, create_title_query, \
create_identifier_query, create_pennsieve_identifier_query, create_field_query, create_request_body_for_curies, create_onto_term_query, \
create_multiple_doi_query, create_multiple_discoverId_query, create_anatomy_query, get_body_scaffold_dataset_id, \
create_multiple_mimetype_query, create_citations_query
from scripts.email_sender import EmailSender, feedback_email, issue_reporting_email, creation_request_confirmation_email, anbc_form_creation_request_confirmation_email, service_form_submission_request_confirmation_email
from threading import Lock
from xml.etree import ElementTree
from app.config import Config
from app.dbtable import AnnotationTable, MapTable, ScaffoldTable, FeaturedDatasetIdSelectorTable, ProtocolMetricsTable
from app.scicrunch_process_results import process_results, process_get_first_scaffold_info, reform_aggregation_results, \
reform_curies_results, reform_dataset_results, reform_related_terms, reform_anatomy_results
from app.serializer import ContactRequestSchema
from app.utilities import img_to_base64_str, get_path_from_mangled_list, get_extension
from app.osparc.osparc import start_simulation as do_start_simulation
from app.osparc.osparc import check_simulation as do_check_simulation
from app.biolucida_process_results import process_results as process_biolucida_results, process_result as process_biolucida_result
import uuid
logging.basicConfig()
app = Flask(__name__)
log_level = Config.LOG_LEVEL.upper()
app.logger.setLevel(getattr(logging, log_level, logging.WARNING))
executor = ThreadPoolExecutor(max_workers=8)
# set environment variable
app.config["ENV"] = Config.DEPLOY_ENV
cache = Cache(app, config={'CACHE_TYPE': 'SimpleCache', 'CACHE_DEFAULT_TIMEOUT': 300})
CORS(app)
ma = Marshmallow(app)
email_sender = EmailSender()
ps = None
s3 = boto3.client(
"s3",
aws_access_key_id=Config.SPARC_PORTAL_AWS_KEY,
aws_secret_access_key=Config.SPARC_PORTAL_AWS_SECRET,
region_name="us-east-1",
)
biolucida_lock = Lock()
db_url = Config.DATABASE_URL
if db_url and db_url.startswith("postgres://"):
db_url = db_url.replace("postgres://", "postgresql://", 1)
try:
annotationtable = AnnotationTable(db_url)
except AttributeError:
annotationtable = None
try:
maptable = MapTable(db_url)
except AttributeError:
maptable = None
try:
scaffoldtable = ScaffoldTable(db_url)
except AttributeError:
scaffoldtable = None
try:
featuredDatasetIdSelectorTable = FeaturedDatasetIdSelectorTable(db_url)
except AttributeError:
featuredDatasetIdSelectorTable = None
try:
protocolMetricsTable = ProtocolMetricsTable(db_url)
except AttributeError:
protocolMetricsTable = None
class Biolucida(object):
_token = ''
_expiry_date = datetime.now() + timedelta(999999)
_pending_authentication = False
@staticmethod
def set_token(value):
Biolucida._token = value
def token(self):
return self._token
@staticmethod
def set_expiry_date(value):
Biolucida._expiry_date = value
def expiry_date(self):
return self._expiry_date
@staticmethod
def set_pending_authentication(value):
Biolucida._pending_authentication = value
def pending_authentication(self):
return self._pending_authentication
@app.errorhandler(404)
def resource_not_found(e):
return jsonify(error=str(e)), 404
@app.before_first_request
def connect_to_pennsieve():
global ps
try:
ps = Pennsieve(
api_token=Config.PENNSIEVE_API_TOKEN,
api_secret=Config.PENNSIEVE_API_SECRET,
env_override=False,
host=Config.PENNSIEVE_API_HOST
)
except requests.exceptions.HTTPError as err:
logging.error("Unable to connect to Pennsieve host")
logging.error(err)
except PSUnauthorizedException as err:
logging.error("Unable to authorise with Pennsieve Api")
logging.error(err)
except Exception as err:
logging.error("Unknown Error")
logging.error(err)
@app.before_first_request
def connect_to_pennsieve2():
global ps2
try:
ps2 = new_client(api_key=Config.PENNSIEVE_API_TOKEN, api_secret=Config.PENNSIEVE_API_SECRET, api_host=Config.PENNSIEVE_API_HOST, api2_host=None)
except Exception as err:
logging.error(f"Error connecting to pennsieve 2 agent: {err}")
viewers_scheduler = BackgroundScheduler()
metrics_scheduler = BackgroundScheduler()
services_scheduler = BackgroundScheduler()
featured_dataset_id_scheduler = BackgroundScheduler()
update_contentful_event_entries_scheduler = BackgroundScheduler()
protocol_metrics_scheduler = BackgroundScheduler()
# If nothing is stored in the DB than update it now
protocol_metrics = get_protocol_metrics_table_state(protocolMetricsTable)
if Config.SPARC_API_DEBUGGING == 'FALSE' and (protocol_metrics is None or protocol_metrics.get('total_protocol_views') == -1):
update_protocol_metrics()
if not protocol_metrics_scheduler.running:
logging.info('Starting scheduler for protocol metrics acquisition')
protocol_metrics_scheduler.start()
if not featured_dataset_id_scheduler.running:
logging.info('Starting scheduler for featured dataset id acquisition')
featured_dataset_id_scheduler.start()
# Run monthly annotation states clean up
if annotationtable:
annotation_cleanup_scheduler = BackgroundScheduler()
annotation_cleanup_scheduler.start()
# Check on the second of each month at 2am
annotation_cleanup_scheduler.add_job(annotationtable.removeExpiredState, 'cron',
year='*', month='*', day='2', hour='2', minute=0, second=0)
# Only need to run the update contentful entries scheduler on one environment, so dev was chosen to keep prod more responsive
if Config.DEPLOY_ENV == 'development' and Config.SPARC_API_DEBUGGING == 'FALSE':
if not update_contentful_event_entries_scheduler.running:
logging.info('Starting scheduler for updating contentful event entries')
update_contentful_event_entries_scheduler.start()
# Update the contentful entries daily at 2 AM EST
update_contentful_event_entries_scheduler.add_job(update_all_events_sort_order, 'cron', hour=2, timezone='US/Eastern')
osparc_data = {}
@app.before_first_request
def get_osparc_file_viewers():
logging.info('Getting oSPARC viewers')
# Gets a list of default viewers.
try:
req = requests.get(url=f'{Config.OSPARC_API_HOST}/viewers')
if req.ok and 'application/json' in req.headers.get('content-type', ''):
viewers = req.json()
table = build_filetypes_table(viewers["data"])
osparc_data["file_viewers"] = table
except Exception as e:
logging.error('Could not retreive oSPARC viewers', e)
if not viewers_scheduler.running:
logging.info('Starting scheduler for oSPARC viewers acquisition')
viewers_scheduler.start()
usage_metrics = {}
google_analytics = init_ga_reporting()
algolia = init_algolia_client()
contentful = init_cf_cda_client()
@app.before_first_request
def get_metrics():
logging.info('Gathering metrics data')
if google_analytics:
ga_response = get_ga_1year_sessions(google_analytics)
usage_metrics['1year_sessions_count'] = ga_response
if algolia:
algolia_response = get_dataset_count(algolia)
usage_metrics['dataset_count'] = algolia_response
if contentful:
cf_response = get_funded_projects_count(contentful)
usage_metrics['funded_projects_count'] = cf_response
ps_response = get_download_count()
usage_metrics['1year_download_count'] = ps_response
if not metrics_scheduler.running:
logging.info('Starting scheduler for metrics acquisition')
metrics_scheduler.start()
osparc_services = OSparcServices()
@app.before_first_request
def get_services():
logging.info('Fetching oSPARC services')
try:
req = requests.get(url=f'{Config.OSPARC_API_HOST}/services')
services_resp = req.json()
osparc_services.set_services(services_resp['data'])
except Exception as e:
logging.error('Request to get oSPARC services failed', e)
# Gets oSPARC services before the first request after startup and then once a day.
services_scheduler.add_job(func=get_services, trigger="interval", days=1)
# Gets oSPARC viewers before the first request after startup and then once a day.
viewers_scheduler.add_job(func=get_osparc_file_viewers, trigger="interval", days=1)
# Gathers all the required metrics, once every three hours
metrics_scheduler.add_job(func=get_metrics, trigger='interval', hours=3)
# Update the featured dataset id on deploy and then every hour
featured_dataset_id_trigger = OrTrigger([DateTrigger(), IntervalTrigger(hours=1)])
featured_dataset_id_scheduler.add_job(lambda: set_featured_dataset_id(featuredDatasetIdSelectorTable), featured_dataset_id_trigger)
# Update the protocol metrics once a week on saturday at midnight
if Config.SPARC_API_DEBUGGING == 'FALSE':
protocol_metrics_scheduler.add_job(update_protocol_metrics, 'cron', day_of_week='sat', hour=0, minute=0)
def shutdown_schedulers():
logging.info('Stopping scheduler for oSPARC viewers acquisition')
if viewers_scheduler.running:
viewers_scheduler.shutdown()
logging.info('Stopping scheduler for metrics acquisition')
if metrics_scheduler.running:
metrics_scheduler.shutdown()
logging.info('Stopping scheduler for updating contentful entries')
if update_contentful_event_entries_scheduler.running:
update_contentful_event_entries_scheduler.shutdown()
logging.info('Stopping scheduler for updating featured dataset id')
if featured_dataset_id_scheduler.running:
featured_dataset_id_scheduler.shutdown()
logging.info('Stopping scheduler for updating protocol metrics')
if protocol_metrics_scheduler.running:
protocol_metrics_scheduler.shutdown()
logging.info('Stopping scheduler for oSPARC services')
if services_scheduler.running:
services_scheduler.shutdown()
atexit.register(shutdown_schedulers)
@app.route("/health")
def health():
return json.dumps({"status": "healthy"})
@app.route("/contact", methods=["POST"])
def contact():
data = json.loads(request.data)
contact_request = ContactRequestSchema().load(data)
name = contact_request["name"]
email = contact_request["email"]
message = contact_request["message"]
email_sender.send_email(name, email, message)
email_sender.mailersend_email(Config.SES_SENDER, email, 'Feedback submission', feedback_email.substitute({'message': message}))
return json.dumps({"status": "sent"})
def create_s3_presigned_url(s3BucketName, key, content_type, expiration):
response = s3.generate_presigned_url(
"get_object",
Params={"Bucket": s3BucketName, "Key": key, "RequestPayer": "requester", "ResponseContentType": content_type},
ExpiresIn=expiration,
)
return response
# Download a file from S3
@app.route("/download")
def create_presigned_url(expiration=3600, bucket_name=Config.DEFAULT_S3_BUCKET_NAME):
key = request.args.get("key")
s3BucketName = request.args.get("s3BucketName", bucket_name)
content_type = request.args.get("contentType", "application/octet-stream")
return create_s3_presigned_url(s3BucketName, key, content_type, expiration)
@app.route("/thumbnail/neurolucida")
def thumbnail_from_neurolucida_file():
query_args = request.args
if 'version' not in query_args or 'datasetId' not in query_args or 'path' not in query_args:
return abort(400, description=f"Query arguments are not valid.")
url = f"{Config.NEUROLUCIDA_HOST}/thumbnail"
try:
response = requests.get(url, params=query_args, timeout=5)
response.raise_for_status()
if response.status_code == 200:
if response.headers.get('Content-Type', 'unknown') == 'image/png':
return base64.b64encode(response.content)
abort(400, 'Failed to retrieve thumbnail.')
except requests.exceptions.ConnectionError:
return abort(400, description="Unable to make a connection to NEUROLUCIDA_HOST.")
except requests.exceptions.Timeout:
return abort(504, 'Request to NEUROLUCIDA_HOST timed out.')
except requests.exceptions.RequestException as e:
return abort(502, f"Error while requesting NEUROLUCIDA_HOST: {str(e)}")
@app.route("/thumbnail/segmentation")
def extract_thumbnail_from_xml_file(bucket_name=Config.DEFAULT_S3_BUCKET_NAME):
"""
Extract a thumbnail from a mbf xml file.
First phase is to find the thumbnail element in the xml document.
Second phase is to convert the xml to a base64 png.
"""
query_args = request.args
if 'path' not in query_args:
return abort(400, description=f"Query arguments are not valid.")
s3BucketName = query_args.get("s3BucketName", bucket_name)
path = query_args['path']
resource = None
start_tag_found = False
end_tag_found = False
start_byte = 0
offset = 256000
end_byte = offset
while not start_tag_found or not end_tag_found:
try:
response = s3.get_object(
Bucket=s3BucketName,
Key=path,
Range=f"bytes={start_byte}-{end_byte}",
RequestPayer="requester"
)
except ClientError as ex:
if ex.response['Error']['Code'] == 'NoSuchKey':
return abort(404, description=f"Could not find file: '{path}'")
else:
return abort(404, description=f"Unknown error for file: '{path}'")
resource = response["Body"].read().decode('UTF-8')
start_tag_found = '')] + ''
xml = ElementTree.fromstring(thumbnail_xml)
size_info = xml.attrib
im_data = ''
for child in xml:
im_data += child.text[2:]
byte_im_data = bytes.fromhex(im_data)
im = Image.frombytes("RGB", (int(size_info['rows']), int(size_info['cols'])), byte_im_data)
base64_form = img_to_base64_str(im)
return base64_form
@app.route("/exists/")
def url_exists(path, bucket_name=Config.DEFAULT_S3_BUCKET_NAME):
query_args = request.args
s3BucketName = query_args.get("s3BucketName", bucket_name)
try:
head_response = s3.head_object(
Bucket=s3BucketName,
Key=path,
RequestPayer="requester"
)
except ClientError:
return {'exists': 'false'}
content_length = head_response.get('ContentLength', 0)
if content_length > 0:
return {'exists': 'true'}
return {'exists': 'false'}
def fetch_discover_file_information(uri):
# Fudge the URI from Sci-crunch
uri = uri.replace('https://api.pennsieve.io/', 'https://api.pennsieve.io/discover/')
uri = uri[:uri.rfind('/')]
r = requests.get(uri)
return r.json()
@app.route("/s3-resource/discover_path")
def get_discover_path():
uri = request.args.get('uri')
try:
json_response = fetch_discover_file_information(uri)
if 'totalCount' in json_response and json_response['totalCount'] == 1:
file_info = json_response['files'][0]
return file_info['path']
except Exception as ex:
logging.error('Failed to retrieve uri {uri}', ex)
return abort(404, description=f'Failed to retrieve uri {uri}')
def s3_header_check(path, bucket_name):
try:
head_response = s3.head_object(
Bucket=bucket_name,
Key=path,
RequestPayer="requester"
)
content_length = head_response.get('ContentLength', Config.DIRECT_DOWNLOAD_LIMIT)
if content_length and not content_length < Config.DIRECT_DOWNLOAD_LIMIT: # 20 MB
return abort(413, description=f"File too big to download: {content_length}")
except botocore.exceptions.ClientError as err:
# NOTE: This case is required because of https://github.com/boto/boto3/issues/2442
if err.response["Error"]["Code"] == "404":
return (404, f'Provided path was not found on the s3 resource')
elif err.response["Error"]["Code"] == "403":
return (403, f'There is a permission issue when accessing the file at specified path')
else:
return abort(err.response["Error"]["Code"], err.response["Error"]["Message"])
else:
return (200, 'OK')
# Reverse proxy for objects from S3, a simple get object
# operation. This is used by scaffoldvuer and its
# important to keep the relative for accessing
# other required files.
@app.route("/s3-resource/")
def direct_download_url(path, bucket_name=Config.DEFAULT_S3_BUCKET_NAME):
query_args = request.args
s3BucketName = query_args.get("s3BucketName", bucket_name)
s3_path = path # Will modify s3_path if we find name mangling
# Check the header to see if too large or does not exist
response = s3_header_check(path, s3BucketName)
# If the file does not exist, check if the name was mangled
if response[0] == 404 or response[0] == 403:
s3_path_modified = get_path_from_mangled_list(path)
if s3_path_modified == s3_path:
abort(404, description=f'Provided path was not found on the s3 resource') # Abort if path did not change
# Check the modified path
response2 = s3_header_check(s3_path_modified, s3BucketName)
if response2[0] == 200:
s3_path = s3_path_modified # Modify the path if de-mangling was successful
elif response2[0] == 404:
abort(404, description=f'Provided path was not found on the s3 resource')
elif response2[0] == 403:
abort(403, description=f'There is a permission issue when accessing the file at specified path')
response = s3.get_object(
Bucket=s3BucketName,
Key=s3_path,
RequestPayer="requester"
)
encode_base64 = request.args.get("encodeBase64")
resource = response["Body"].read()
if encode_base64 is not None:
return base64.b64encode(resource)
return resource
@app.route("/scicrunch-dataset//")
def sci_doi(doi1, doi2):
doi = doi1.replace('DOI:', '') + '/' + doi2
data = create_doi_query(doi)
try:
response = requests.post(
f'{Config.SCI_CRUNCH_HOST}/_search?api_key={Config.KNOWLEDGEBASE_KEY}',
json=data)
return response.json()
except requests.exceptions.HTTPError as err:
logging.error(err)
return json.dumps({'error': err})
# /pubmed/ Used as a proxy for making requests to pubmed
@app.route("/pubmed/")
@app.route("/pubmed//")
def pubmed(id_):
try:
response = requests.get(f'https://pubmed.ncbi.nlm.nih.gov/{id_}/')
return response.text
except requests.exceptions.HTTPError as err:
logging.error(err)
return json.dumps({'error': err})
# /scicrunch-query-string/: Returns results for given organ curie. These can be processed by the sidebar
@app.route("/scicrunch-query-string/")
def sci_organ():
fields = request.args.getlist('field')
curie = request.args.get('curie')
size = request.args.get('size')
from_ = request.args.get('from')
data = create_field_query(fields, curie, size, from_)
try:
response = requests.post(
f'{Config.SCI_CRUNCH_HOST}/_search?api_key={Config.KNOWLEDGEBASE_KEY}',
json=data)
return process_results(response.json())
except requests.exceptions.HTTPError as err:
logging.error(err)
return json.dumps({'error': err})
@app.route("/dataset_info/using_doi")
def get_dataset_info_doi():
doi = request.args.get('doi')
raw = request.args.get('raw_response')
query = create_doi_query(doi)
if raw is None:
return reform_dataset_results(dataset_search(query))
return dataset_search(query)
@app.route("/dataset_info/using_multiple_dois")
@app.route("/dataset_info/using_multiple_dois/")
def get_dataset_info_dois():
dois = request.args.getlist('dois')
query = create_multiple_doi_query(dois)
return process_results(dataset_search(query))
@app.route("/multiple_dataset_info/using_multiple_mimetype")
@app.route("/multiple_dataset_info/using_multiple_mimetype/")
def get_file_info_from_mimetype():
# q here is a scicrunch query ie: "*jp2*+OR+*vnd.ome.xml*+OR+*jpx*"
q = request.args.getlist('q')
query = create_multiple_mimetype_query(q)
return process_results(dataset_search(query))
@app.route("/dataset_info/using_multiple_discoverIds")
@app.route("/dataset_info/using_multiple_discoverIds/")
def get_dataset_info_discoverIds():
discoverIds = request.args.getlist('discoverIds')
query = create_multiple_discoverId_query(discoverIds)
return process_results(dataset_search(query))
@app.route("/dataset_info/using_title")
def get_dataset_info_title():
title = request.args.get('title')
query = create_title_query(title)
return reform_dataset_results(dataset_search(query))
@app.route("/dataset_info/using_object_identifier")
def get_dataset_info_object_identifier():
identifier = request.args.get('identifier')
query = create_identifier_query(identifier)
return reform_dataset_results(dataset_search(query))
@app.route("/dataset_info/anatomy")
def get_dataset_info_anatomy():
identifier = request.args.get('identifier', -1)
if identifier == -1:
return abort(404, description=f'Identifier for API call not set.')
query = create_anatomy_query(identifier)
return reform_anatomy_results(dataset_search(query))
@app.route("/dataset_info/using_pennsieve_identifier")
def get_dataset_info_pennsieve_identifier():
identifier = request.args.get('identifier')
query = create_pennsieve_identifier_query(identifier)
return reform_dataset_results(dataset_search(query))
@app.route("/segmentation_info/")
def get_segmentation_info_from_file(bucket_name=Config.DEFAULT_S3_BUCKET_NAME):
query_args = request.args
if 'dataset_path' not in query_args:
return abort(400, description=f"Query arguments must include 'dataset_path'.")
s3BucketName = query_args.get("s3BucketName", bucket_name)
dataset_path = query_args.get('dataset_path')
try:
# Check the header to see if too large
response = s3_header_check(dataset_path, s3BucketName)
# Check if file exists
if response[0] == 404:
abort(404, description=f'Provided path was not found on the s3 resource')
response = s3.get_object(
Bucket=s3BucketName,
Key=dataset_path,
RequestPayer="requester"
)
except ClientError as ex:
if ex.response['Error']['Code'] == 'NoSuchKey':
return abort(404, description=f"Could not find file: '{dataset_path}'")
else:
return abort(404, description=f"Unknown error for file: '{dataset_path}'")
resource = response["Body"].read()
xml = ElementTree.fromstring(resource)
subject_element = xml.find('./{*}sparcdata/{*}subject')
info = {}
if subject_element is not None:
info['subject'] = subject_element.attrib
else:
info['subject'] = {'age': '', 'sex': '', 'species': '', 'subjectid': ''}
atlas_element = xml.find('./{*}sparcdata/{*}atlas')
if atlas_element is not None:
info['atlas'] = atlas_element.attrib
else:
info['atlas'] = {'organ': ''}
return info
@app.route("/current_doi_list")
def get_all_doi():
query = create_doi_aggregate()
results = reform_aggregation_results(dataset_search(query))
doi_results = []
for result in results['doi']['buckets']:
doi_results.append(result['key']['curie'])
return {'results': doi_results}
def dataset_search(query):
try:
payload = query
params = {
"api_key": Config.KNOWLEDGEBASE_KEY
}
response = requests.post(f'{Config.SCI_CRUNCH_HOST}/_search',
json=payload, params=params)
return response.json()
except requests.exceptions.HTTPError as err:
logging.error(err)
return jsonify({'error': err})
# /search/: Returns sci-crunch results for a given query
@app.route("/search/", defaults={'query': '', 'limit': 10, 'start': 0})
@app.route("/search/")
def kb_search(query, limit=10, start=0):
try:
if request.args.get('limit') is not None:
limit = request.args.get('limit')
if request.args.get('query') is not None:
query = request.args.get('query')
if request.args.get('start') is not None:
start = request.args.get('start')
# print(f'{Config.SCI_CRUNCH_HOST}/_search?q={query}&size={limit}&from={start}&api_key={Config.KNOWLEDGEBASE_KEY}')
response = requests.get(f'{Config.SCI_CRUNCH_HOST}/_search?q={query}&size={limit}&from={start}&api_key={Config.KNOWLEDGEBASE_KEY}')
return process_results(response.json())
except requests.exceptions.HTTPError as err:
logging.error(err)
return json.dumps({'error': err})
# /filter-search/: Returns sci-crunch results with optional params for facet filtering, sizing, and pagination
@app.route("/filter-search/", defaults={'query': ''})
@app.route("/filter-search//")
def filter_search(query):
terms = request.args.getlist('term')
facets = request.args.getlist('facet')
size = request.args.get('size')
start = request.args.get('start')
# Create request
data = create_filter_request(query, terms, facets, size, start)
# Send request to sci-crunch
try:
response = requests.post(
f'{Config.SCI_CRUNCH_HOST}/_search?api_key={Config.KNOWLEDGEBASE_KEY}',
json=data)
results = process_results(response.json())
except requests.exceptions.HTTPError as err:
logging.error(err)
return jsonify({'error': str(err), 'message': 'SciCrunch is not currently reachable, please try again later'}), 502
except json.JSONDecodeError:
return jsonify({'message': 'Could not parse SciCrunch output, please try again later',
'error': 'JSONDecodeError'}), 502
return results
# /get-facets/: Returns available sci-crunch facets for filtering over given a ('species', 'gender' etc)
@app.route("/get-facets/")
def get_facets(type_):
# Create facet query
type_map, data = create_facet_query(type_)
# Make a request for each sci-crunch parameter
results = []
for path in type_map[type_]:
data['aggregations'][f'{type_}']['terms']['field'] = path
try:
response = requests.post(
f'{Config.SCI_CRUNCH_HOST}/_search?api_key={Config.KNOWLEDGEBASE_KEY}',
json=data)
json_result = response.json()
results.append(json_result)
except json.JSONDecodeError:
return jsonify({'message': 'Could not parse SciCrunch output, please try again later',
'error': 'JSONDecodeError'}), 502
except Exception as ex:
logging.error(f"Could not search SciCrunch for path {path}", ex)
# Select terms from the results
terms = []
for result in results:
terms += result['aggregations'][f'{type_}']['buckets']
return jsonify(terms)
def inject_markdown(resp):
if "readme" in resp:
mark_req = requests.get(resp.get("readme"))
resp["markdown"] = mark_req.text
def inject_template_data(resp):
id_ = resp.get("id")
uri = resp.get("uri")
if id_ is None or uri is None:
return
parsed_uri = urlparse(uri)
bucket = parsed_uri.netloc
try:
response = s3.get_object(
Bucket=bucket,
Key="{}/files/template.json".format(id_),
RequestPayer="requester",
)
except ClientError:
# If the file is not under folder 'files', check under folder 'packages'
debugging = Config.SPARC_API_DEBUGGING == "TRUE"
if debugging:
logging.warning(
"Required file template.json was not found under /files folder, trying under /packages..."
)
try:
response = s3.get_object(
Bucket=bucket,
Key="{}/packages/template.json".format(id_),
RequestPayer="requester",
)
except ClientError as e:
if debugging:
logging.error(e)
return
template = response["Body"].read()
try:
template_json = json.loads(template)
except ValueError as e:
logging.error(e)
return
resp["study"] = {
"uuid": template_json.get("uuid"),
"name": template_json.get("name"),
"description": template_json.get("description"),
}
# Constructs a table with where keys are the normalized (lowercased) file types
# and the values an array of possible viewers
def build_filetypes_table(osparc_viewers):
table = {}
for viewer in osparc_viewers:
filetype = viewer["file_type"].lower()
del viewer["file_type"]
if not table.get(filetype, False):
table[filetype] = []
table[filetype].append(viewer)
return table
@app.route("/sim/dataset/")
def sim_dataset(id_):
if request.method == "GET":
try:
req = requests.get("{}/datasets/{}".format(Config.DISCOVER_API_HOST, id_))
if req.ok:
json_data = req.json()
inject_markdown(json_data)
inject_template_data(json_data)
return jsonify(json_data)
except Exception as ex:
logging.error(f"Could not fetch SIM dataset {id_}", ex)
return abort(404, description="Resource not found")
@app.route("/sim/dataset//versions/")
def sim_dataset_versions(id_, version_):
if request.method == "GET":
try:
req = requests.get("{}/datasets/{}/versions/{}".format(Config.DISCOVER_API_HOST, id_, version_))
if req.ok:
json_data = req.json()
inject_markdown(json_data)
inject_template_data(json_data)
return jsonify(json_data)
except Exception as ex:
logging.error(f"Could not fetch SIM dataset {id_} version {version_}", ex)
return abort(404, description="Resource not found")
@app.route("/get_osparc_data")
def get_osparc_data():
return jsonify(osparc_data)
@app.route('/sim/service')
def osparc_search():
if request.method == 'GET':
search = request.args.get('search')
limit = request.args.get('limit', default=5, type=int)
skip = request.args.get('skip', default=0, type=int)
results = osparc_services.search_services(search, limit, skip)
return jsonify(results)
@app.route('/sim/file')
def osparc_extensions():
if request.method == 'GET':
extensions = osparc_services.file_extensions
return jsonify({
"file_viewers": extensions
})
@app.route("/project/", methods=["GET"])
def datasets_by_project_id(project_id):
datasets = get_associated_datasets(project_id)
if len(datasets['hits']) > 0:
return jsonify(datasets['hits'])
else:
abort(404, description="Resource not found")
@app.route("/get_featured_datasets_identifiers", methods=["GET"])
def get_featured_datasets_identifiers():
return {'identifiers': get_featured_datasets()}
@app.route("/get_featured_dataset", methods=["GET"])
@cache.cached(timeout=300)
def get_featured_dataset():
featured_dataset_id = get_featured_dataset_id_table_state(featuredDatasetIdSelectorTable)["featured_dataset_id"]
if featured_dataset_id == -1:
# In case there was an error while setting the id, just return a default dataset so the homepage does not break.
featured_dataset_id = 32
try:
response = requests.get("{}/datasets?ids={}".format(Config.DISCOVER_API_HOST, featured_dataset_id)).json()
# in case the dataset has been unpublished, just return default
if response['datasets'] == []:
response = requests.get("{}/datasets?ids={}".format(Config.DISCOVER_API_HOST, 32)).json()
return response
except Exception as ex:
logging.error(f"Could not get featured dataset {featured_dataset_id}", ex)
abort(404, description="An error occured while fetching the resource")
@app.route("/reva/subject-ids", methods=["GET"])
def getRevaSubjectIds():
try:
primary_folder = ps2.get(f'/packages/{Config.REVA_3D_TRACING_PRIMARY_FOLDER_COLLECTION_ID}')
primary_children = primary_folder['children']
subject_ids = []
for child in primary_children:
if child['content']['packageType'] == 'Collection':
subject_ids.append(child['content']['name'])
return jsonify({"status": "success", "ids": subject_ids}), 200
except Exception as e:
logging.error(f"Error while getting REVA subject id files: {e}")
return jsonify({"status": "Error while getting REVA subject id files: ", "message": e}), 500
def getRevaTracingInSituFolderChildren(subject_id):
try:
coordinates_folder_name = 'CoordinatesData'
in_situ_folder_name = 'InSitu'
primary_folder = ps2.get(f'/packages/{Config.REVA_3D_TRACING_PRIMARY_FOLDER_COLLECTION_ID}')
if not primary_folder:
msg = f"Primary folder not found: {Config.REVA_3D_TRACING_PRIMARY_FOLDER_COLLECTION_ID}"
logging.error(msg)
return abort(404, description=msg)
primary_children = primary_folder.get('children', [])
subject_child = next((child for child in primary_children if child['content']['name'] == subject_id), None)
if subject_child is None:
msg = f"Subject folder not found for subject id: {subject_id}"
logging.error(msg)
return abort(404, description=msg)
subject_folder = ps2.get(f"/packages/{subject_child['content']['id']}")
if not subject_folder:
msg = f"Subject folder could not be fetched for id: {subject_child['content']['id']}"
logging.error(msg)
return abort(404, description=msg)
subject_children = subject_folder.get('children', [])
coordinates_child = next((child for child in subject_children if child['content']['name'] == coordinates_folder_name), None)
if coordinates_child is None:
msg = f"CoordinatesData folder not found for subject: {subject_id}"
logging.error(msg)
return abort(404, description=msg)
coordinates_folder = ps2.get(f"/packages/{coordinates_child['content']['id']}")
if not coordinates_folder:
msg = f"CoordinatesData folder could not be fetched for id: {coordinates_child['content']['id']}"
logging.error(msg)
return abort(404, description=msg)
coordinates_children = coordinates_folder.get('children', [])
in_situ_child = next((child for child in coordinates_children if child['content']['name'] == in_situ_folder_name), None)
if in_situ_child is None:
msg = f"InSitu folder not found for subject: {subject_id}"
logging.error(msg)
return abort(404, description=msg)
# Get in situ folder
in_situ_folder = ps2.get(f"/packages/{in_situ_child['content']['id']}")
if not in_situ_folder:
msg = f"InSitu folder could not be fetched for id: {in_situ_child['content']['id']}"
logging.error(msg)
return abort(404, description=msg)
return in_situ_folder.get('children', [])
except Exception as e:
msg = f"Exception thrown when getting Reva InSitu Folder: {e}"
logging.error(msg)
return abort(500, description=msg)
@app.route("/reva/anatomical-landmarks-files/", methods=["GET"])
def getRevaAnatomicalLandmarksFiles(subject_id):
try:
anatomical_landmarks_folder_name = 'AnatomicalLandmarks'
in_situ_children = getRevaTracingInSituFolderChildren(subject_id)
anatomical_landmarks_child = next((child for child in in_situ_children if child['content']['name'] == anatomical_landmarks_folder_name), None)
if anatomical_landmarks_child is None:
logging.error(f"REVA tracing folder {anatomical_landmarks_folder_name} not found for subject: {subject_id}")
return jsonify({"status": "ERROR", "message": f"{anatomical_landmarks_folder_name} folder not found for subject: {subject_id}"}), 404
anatomical_landmarks_folder = ps2.get(f"/packages/{anatomical_landmarks_child['content']['id']}")
anatomical_landmarks_children = anatomical_landmarks_folder['children']
anatomical_landmarks_folders = []
for anatomical_landmark_child in anatomical_landmarks_children:
landmark_folder_name = anatomical_landmark_child['content']['name']
landmark_folder_id = anatomical_landmark_child['content']['id']
anatomical_landmark_folder = ps2.get(f"/packages/{landmark_folder_id}")
landmark_children = anatomical_landmark_folder['children']
landmark_files = []
for landmark_child in landmark_children:
landmark_file_package_id = landmark_child['content']['id']
landmark_file = ps2.get(f"/packages/{landmark_file_package_id}/view")
landmark_file_id = landmark_file[0]['content']['id']
landmark_file_presigned_url = ps2.get(f"/packages/{landmark_file_package_id}/files/{landmark_file_id}")['url']
landmark_files.append({'name': str(landmark_child['content']['name']), 's3Url': str(landmark_file_presigned_url)})
anatomical_landmarks_folders.append({'name': str(landmark_folder_name), 'files': landmark_files})
return jsonify({"status": "success", "folders": anatomical_landmarks_folders}), 200
except Exception as e:
logging.error(f"Error while getting REVA anatomical landmarks files {e}")
return jsonify({"status": "Error while getting anatomical landmarks files: ", "message": e}), 500
@app.route("/reva/tracing-files/", methods=["GET"])
def getRevaTracingFiles(subject_id):
try:
vagus_nerve_folder_name = 'VagusNerve'
in_situ_children = getRevaTracingInSituFolderChildren(subject_id)
vagus_nerve_child = next((child for child in in_situ_children if child['content']['name'] == vagus_nerve_folder_name), None)
if vagus_nerve_child is None:
logging.error(f"REVA tracing folder {vagus_nerve_folder_name} not found for subject: {subject_id}")
return jsonify({"status": "ERROR", "message": f"{vagus_nerve_folder_name} folder not found for subject: {subject_id}"}), 404
vagus_nerve_folder = ps2.get(f"/packages/{vagus_nerve_child['content']['id']}")
vagus_nerve_children = vagus_nerve_folder['children']
vagus_tracing_files = []
for vagus_region_child in vagus_nerve_children:
vagus_region_folder = ps2.get(f"/packages/{vagus_region_child['content']['id']}")
# get file and use id and package id for getting the presigned url https://api.pennsieve.io/packages/{id}/view
vagus_region_children = vagus_region_folder['children']
for vagus_file_child in vagus_region_children:
file_package_id = vagus_file_child['content']['id']
vagus_file = ps2.get(f"/packages/{file_package_id}/view")
vagus_file_id = vagus_file[0]['content']['id']
vagus_file_presigned_url = ps2.get(f"/packages/{file_package_id}/files/{vagus_file_id}")['url']
vagus_tracing_files.append(
{'name': str(vagus_file_child['content']['name']), 'region': str(vagus_region_child['content']['name']), 's3Url': str(vagus_file_presigned_url)})
return jsonify({"status": "success", "files": vagus_tracing_files}), 200
except Exception as e:
logging.error(f"Error while getting REVA tracing files {e}")
return jsonify({"status": "Error while getting tracing files: ", "message": e}), 500
@app.route("/reva/micro-ct-files/", methods=["GET"])
def getRevaMicroCtFiles(subject_id):
micro_ct_visualization_folder_name = f'{subject_id}-MicroCTVisualization'
try:
primary_folder = ps2.get(f'/packages/{Config.REVA_MICRO_CT_PRIMARY_FOLDER_COLLECTION_ID}')
primary_children = primary_folder['children']
subject_child = next((child for child in primary_children if child['content']['name'] == subject_id), None)
if subject_child is None:
logging.error(f'REVA microCT folder not found with subject id: {subject_id}')
return jsonify({"status": "ERROR", "message": f"MicroCT folder not found with subject id: {subject_id}"}), 404
subject_folder = ps2.get(f"/packages/{subject_child['content']['id']}")
subject_children = subject_folder['children']
micro_ct_child = next((child for child in subject_children if child['content']['name'] == micro_ct_visualization_folder_name), None)
if micro_ct_child is None:
logging.error(f'REVA microCT {micro_ct_visualization_folder_name} folder not found for subject: {subject_id}')
return jsonify({"status": "ERROR", "message": f"{micro_ct_visualization_folder_name} folder not found for subject: {subject_id}"}), 404
micro_ct_visualization_folder = ps2.get(f"/packages/{micro_ct_child['content']['id']}")
micro_ct_children = micro_ct_visualization_folder['children']
micro_ct_files = []
for micro_child in micro_ct_children:
file_package_id = micro_child['content']['id']
micro_child_file = ps2.get(f"/packages/{file_package_id}/view")
micro_child_file_id = micro_child_file[0]['content']['id']
micro_file_presigned_url = ps2.get(f"/packages/{file_package_id}/files/{micro_child_file_id}")['url']
file_name = micro_child['content']['name']
file_size = micro_child['storage']
package_type = micro_child['content']['packageType']
file_type = micro_child_file[0]['content']['fileType']
micro_ct_files.append(
{'name': str(file_name), 's3Url': str(micro_file_presigned_url), 'type': str(file_type), 'packageType': str(package_type), 'size': str(file_size)})
return jsonify({"status": "success", "files": micro_ct_files}), 200
except Exception as e:
logging.error(f"Error while getting REVA microCT files {e}")
return jsonify({"status": "Error while getting microCT files: ", "message": e}), 500
@app.route("/get_owner_email/", methods=["GET"])
def get_owner_email(owner_id):
# Filter to find user based on provided int id
org = ps._api._organization
members = ps._api.organizations.get_members(org)
res = [x for x in members if x.int_id == owner_id]
if not res:
abort(404, description="Owner not found")
else:
return jsonify({"email": res[0].email})
# Get information of the latest body scaffold for species.
# This endpoint returns the metadata file path, bucket,
# dataset id and version which can be used to construct the url
@app.route("/get_body_scaffold_info/", methods=["GET"])
def get_body_scaffold_info(species):
id = get_body_scaffold_dataset_id(species)
if id:
query = create_pennsieve_identifier_query(id)
result = process_get_first_scaffold_info(dataset_search(query))
if result:
return result
return abort(404, description=f"Whole body info not found for {species}")
@app.route("/thumbnail/", methods=["GET"])
def thumbnail_by_image_id(image_id, recursive_call=False):
bl = Biolucida()
try:
with biolucida_lock:
if not bl.token():
authenticate_biolucida()
url = Config.BIOLUCIDA_ENDPOINT + "/thumbnail/{0}".format(image_id)
headers = {
'token': bl.token(),
}
response = requests.request("GET", url, headers=headers)
encoded_content = base64.b64encode(response.content)
# Response from this endpoint is binary on success so the easiest thing to do is
# check for an error response in encoded form.
if encoded_content == b'eyJzdGF0dXMiOiJBZG1pbiB1c2VyIGF1dGhlbnRpY2F0aW9uIHJlcXVpcmVkIHRvIHZpZXcvZWRpdCB1c2VyIGluZm8uIFlvdSBtYXkgbmVlZCB0byBsb2cgb3V0IGFuZCBsb2cgYmFjayBpbiB0byByZXZlcmlmeSB5b3VyIGNyZWRlbnRpYWxzLiJ9' \
and not recursive_call:
# Authentication failure, try again after resetting token.
with biolucida_lock:
bl.set_token('')
encoded_content = thumbnail_by_image_id(image_id, True)
return encoded_content
except Exception as ex:
logging.error(f"Could not get the thumbnail for {image_id}", ex)
return abort(404, "An error occured while fetching the thumbnail")
@app.route("/image/", methods=["GET"])
def image_info_by_image_id(image_id):
url = Config.BIOLUCIDA_ENDPOINT + "/image/info/{0}".format(image_id)
try:
response = requests.request("GET", url)
return process_biolucida_result(response.json())
except Exception as ex:
logging.error(f"Could not get image info for {image_id}", ex)
return abort(404, "An error occured while getting the image's info")
@app.route("/image_search/", methods=["GET"])
def image_search_by_dataset_id(dataset_id):
url = Config.BIOLUCIDA_ENDPOINT + "/imagemap/search_dataset/discover/{0}".format(dataset_id)
try:
response = requests.request("GET", url)
return response.json()
except Exception as ex:
logging.error(f"Could not search images for dataset {dataset_id}", ex)
return {"error": "An error occured while searching images for dataset"}, 404
@app.route("/image_xmp_info/", methods=["GET"])
def image_xmp_info(image_id):
url = Config.BIOLUCIDA_ENDPOINT + "/image/xmpmetadata/{0}".format(image_id)
try:
result = requests.request("GET", url)
except requests.exceptions.ConnectionError:
return abort(400, description="Unable to make a connection to Biolucida.")
response = result.json()
if response['status'] == 'success':
return process_biolucida_results(response['data'])
return abort(400, description=f"XMP info not found for {image_id}")
@app.route("/image_blv_link/", methods=["GET"])
def image_blv_link(image_id):
url = Config.BIOLUCIDA_ENDPOINT + "/image/blv_link/{0}".format(image_id)
try:
result = requests.request("GET", url)
except requests.exceptions.ConnectionError:
return abort(400, description="Unable to make a connection to Biolucida.")
response = result.json()
if response['status'] == 'success':
return jsonify({'link': response['link']})
return abort(400, description=f"BLV link not found for {image_id}")
def authenticate_biolucida():
bl = Biolucida()
url = Config.BIOLUCIDA_ENDPOINT + "/authenticate"
payload = {'username': Config.BIOLUCIDA_USERNAME,
'password': Config.BIOLUCIDA_PASSWORD,
'token': ''}
files = [
]
headers = {}
response = requests.request("POST", url, headers=headers, data=payload, files=files)
if response.status_code == requests.codes.ok:
content = response.json()
bl.set_token(content['token'])
def get_share_link(table):
# Commit to database even when testing since the Table re-creates a new session each time to prevent stale sessions
commit = True
if table:
json_data = request.get_json()
if json_data and 'state' in json_data:
state = json_data['state']
uuid = table.pushState(state, commit)
return jsonify({"uuid": uuid})
abort(400, description="State not specified")
else:
abort(404, description="Database not available")
def get_saved_state(table):
if table:
json_data = request.get_json()
if json_data and 'uuid' in json_data:
uuid = json_data['uuid']
state = table.pullState(uuid)
if state:
return jsonify({"state": table.pullState(uuid)})
abort(400, description="Key missing or did not find a match")
else:
abort(404, description="Database not available")
# Get the share link for the current map content.
@app.route("/annotation/getshareid", methods=["POST"])
def get_annotation_share_link():
return get_share_link(annotationtable)
# Get the map state using the share link id.
@app.route("/annotation/getstate", methods=["POST"])
def get_annotation_state():
return get_saved_state(annotationtable)
# Get the share link for the current map content.
@app.route("/map/getshareid", methods=["POST"])
def get_map_share_link():
return get_share_link(maptable)
# Get the map state using the share link id.
@app.route("/map/getstate", methods=["POST"])
def get_map_state():
return get_saved_state(maptable)
# Get the share link for the current map content.
@app.route("/scaffold/getshareid", methods=["POST"])
def get_scaffold_share_link():
return get_share_link(scaffoldtable)
# Get the map state using the share link id.
@app.route("/scaffold/getstate", methods=["POST"])
def get_scaffold_state():
return get_saved_state(scaffoldtable)
def verify_recaptcha(token):
try:
captchaReq = requests.post(
url=Config.TURNSTILE_URL,
json={
"secret": Config.NUXT_TURNSTILE_SECRET_KEY,
"response": token
}
)
captchaResp = captchaReq.json()
if "success" not in captchaResp or not captchaResp["success"]:
return {"error": "Failed Captcha Validation"}, 409
return captchaResp.get('success', False)
except Exception as ex:
logging.error("Could not validate captcha, bypassing validation", ex)
def create_github_issue(title, body, labels=None, assignees=None):
url = f"https://api.github.com/repos/{Config.SPARC_GITHUB_ORG}/{Config.SPARC_ISSUES_GITHUB_REPO}/issues"
headers = {
"Authorization": f"token {Config.SPARC_TECH_LEADS_GITHUB_TOKEN}",
"Accept": "application/vnd.github+json"
}
data = {
"title": title,
"body": body,
}
if labels:
data["labels"] = labels
if assignees:
data["assignees"] = assignees
response = requests.post(url, json=data, headers=headers)
if response.status_code == 201:
response_json = response.json()
return {
"html_url": response_json["html_url"],
"comments_url": response_json["comments_url"],
"issue_api_url": response_json["url"]
}
else:
raise Exception(f"GitHub Issue creation failed: {response.text}")
@app.route("/create_issue", methods=["POST"])
def create_issue():
form = request.form
recaptcha_token = request.form.get('captcha_token')
if not app.config['TESTING'] and (not recaptcha_token or not verify_recaptcha(recaptcha_token)):
return jsonify({'error': 'Invalid reCAPTCHA'}), 400
task_type = form.get("type", "bug")
title = form.get("title")
issue_body = form.get("body")
if not title or not issue_body:
abort(400, description="Missing title or body")
email = form.get("email", "").strip()
if task_type in ["bug", "feedback", "test"]:
try:
issue = create_github_issue(title.strip(), issue_body, labels=[task_type], assignees=Config.GITHUB_ISSUE_ASSIGNEES)
issue_url = issue['html_url']
comments_url = issue['comments_url']
issue_api_url = issue['issue_api_url']
except Exception as e:
return jsonify({"error": str(e)}), 500
else:
return jsonify({"error": f"Unsupported task type: {task_type}"}), 400
# default to this if there is no issue_url
response_message = 'Submission could not be created'
status_code = 500
response_status = 'error'
if (issue_url):
response_message = 'Submission created successfully. '
status_code = 201
response_status = 'success'
files = request.files
# host the file on the dummy sparc repo and add the viewable url as a comment to the newly created ticket
if files and 'attachment' in files:
attachment = files['attachment']
file_content = attachment.read()
file_name = attachment.filename
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
unique_id = uuid.uuid4().hex
unique_filename = f"{timestamp}_{unique_id}_{file_name}"
url = f"https://api.github.com/repos/{Config.SPARC_GITHUB_ORG}/{Config.SPARC_ISSUES_GITHUB_REPO}/contents/attachments/{unique_filename}"
headers = {
"Authorization": f"token {Config.SPARC_TECH_LEADS_GITHUB_TOKEN}",
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28"
}
encoded_content = base64.b64encode(file_content).decode('utf-8')
data = {
"message": f"Add file {unique_filename}",
"content": encoded_content
}
try:
response = requests.put(url, headers=headers, json=data)
if response.status_code in (200, 201):
json_response = response.json()
image_url = json_response["content"]["download_url"]
comment_body = f""
headers = {
"Authorization": f"token {Config.SPARC_TECH_LEADS_GITHUB_TOKEN}",
"Accept": "application/vnd.github+json"
}
data = {
"body": comment_body
}
response = requests.post(comments_url, json=data, headers=headers)
if response.status_code != 201:
response_message += 'File attachment unsuccessful. '
status_code = 201
response_status = 'warning'
else:
response_message += 'File attachment successful. '
else:
response_message += 'File upload unsuccessful. '
status_code = 201
response_status = 'warning'
except Exception as e:
response_message += 'File upload unsuccessful. '
status_code = 201
response_status = 'warning'
if email:
# default to bug form if task type not specified
subject = 'SPARC Reported Issue Submission'
email_body = issue_reporting_email.substitute({'message': issue_body})
if (task_type == "feedback"):
subject = 'SPARC Reported Feedback Submission'
email_body = feedback_email.substitute({'message': issue_body})
html_body = markdown.markdown(email_body)
try:
email_sender.mailersend_email(Config.SES_SENDER, email, subject, html_body)
response_message += 'Confirmation email sent to user successful. '
except Exception as e:
response_message += 'Confirmation email sent to user unsuccessful. '
status_code = 201
response_status = 'warning'
return jsonify({"message": response_message, "url": issue_url, "issue_api_url": issue_api_url, "status": response_status}), status_code
def get_hubspot_contact(email, firstname, lastname):
search_url = f"{Config.HUBSPOT_V3_API}/objects/contacts/search"
search_body = {
"filterGroups": [
{
"filters": [
{
"propertyName": "email",
"operator": "EQ",
"value": email
}
]
}
],
"properties": ["email"],
"limit": 1
}
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + Config.HUBSPOT_API_TOKEN
}
search_results = requests.post(search_url, headers=headers, json=search_body)
search_data = search_results.json()
contact_id = None
if search_data.get("results"):
contact_id = search_data["results"][0]["id"]
else:
# Create contact if not found
create_contact_url = f"{Config.HUBSPOT_V3_API}/objects/contacts"
contact_body = {
"properties": {
"email": email,
"firstname": firstname,
"lastname": lastname
}
}
create_res = requests.post(create_contact_url, headers=headers, json=contact_body)
if not create_res.ok:
raise Exception(f"Hubspot contact creation failed: {create_res.status_code} {create_res.text}")
contact_id = create_res.json()["id"]
return contact_id
def create_hubspot_deal(name, stage, pipeline, lead_source=None):
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + Config.HUBSPOT_API_TOKEN
}
create_deal_url = f"{Config.HUBSPOT_V3_API}/objects/deals"
deal_body = {
"properties": {
"dealname": name,
"dealstage": stage,
"pipeline": pipeline,
"lead_source_in_deal": lead_source
}
}
deal_res = requests.post(create_deal_url, headers=headers, json=deal_body)
if not deal_res.ok:
raise Exception(f"Hubspot deal creation failed: {deal_res.status_code} {deal_res.text}")
deal_id = deal_res.json()["id"]
return deal_id
def create_hubspot_note(body, deal_id, contact_id):
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + Config.HUBSPOT_API_TOKEN
}
note_url = f"{Config.HUBSPOT_V3_API}/objects/notes"
hs_timestamp = int(datetime.utcnow().timestamp() * 1000)
note_payload = {
"properties": {
"hs_note_body": body,
"hs_timestamp": hs_timestamp
}
}
note_res = requests.post(note_url, headers=headers, json=note_payload)
if not note_res.ok:
raise Exception(f"HubSpot note creation failed: {note_res.status_code} {note_res.text}")
note_id = note_res.json()["id"]
# Step 2: Associate the note to deal
associate_note_to_deal_url = f"{Config.HUBSPOT_V3_API}/objects/notes/{note_id}/associations/deals/{deal_id}/note_to_deal"
associate_res_deal = requests.put(associate_note_to_deal_url, headers=headers)
if not associate_res_deal.ok:
raise Exception(f"Failed to associate note to deal: {associate_res_deal.status_code} {associate_res_deal.text}")
# Step 3: Associate the note to contact
associate_note_to_contact_url = f"{Config.HUBSPOT_V3_API}/objects/notes/{note_id}/associations/contacts/{contact_id}/note_to_contact"
associate_res_contact = requests.put(associate_note_to_contact_url, headers=headers)
if not associate_res_contact.ok:
raise Exception(f"Failed to associate note to contact: {associate_res_contact.status_code} {associate_res_contact.text}")
return note_id
def associate_hubspot_deal_with_contact(deal_id, contact_id):
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + Config.HUBSPOT_API_TOKEN
}
associate_url = f"{Config.HUBSPOT_V3_API}/objects/deals/{deal_id}/associations/contacts/{contact_id}/deal_to_contact"
assoc_res = requests.put(associate_url, headers=headers)
if not assoc_res.ok:
raise Exception(f"HubSpot deal to contact association failed: {assoc_res.status_code} {assoc_res.text}")
return assoc_res.json()
@app.route("/submit_data_inquiry", methods=["POST"])
def submit_data_inquiry():
form = request.form
recaptcha_token = request.form.get('captcha_token')
if not app.config['TESTING'] and (not recaptcha_token or not verify_recaptcha(recaptcha_token)):
return jsonify({'error': 'Invalid reCAPTCHA'}), 400
email = form.get("email", "").strip()
firstname = form.get("firstname", "").strip()
lastname = form.get("lastname", "").strip()
task_type = form.get("type", "")
is_anbc_form = form.get("isAnbcForm", "false")
title = form.get("title").strip()
body = form.get("body").strip()
is_service_form = form.get("isServiceForm", "false")
if not title or not body or not email or not firstname or not lastname:
return jsonify({"error": "Missing title, body, email, first name, or last name"}), 400
if task_type not in ["research","interest"]:
return jsonify({"error": f"Unsupported task type: {task_type}"}), 400
contact_id = None
deal_id = None
note_id = None
deal_pipeline = Config.HUBSPOT_ONBOARDING_PIPELINE_ID if task_type == "research" else Config.HUBSPOT_GRANT_SEEKER_PIPELINE_ID
deal_stage = Config.HUBSPOT_ONBOARDING_PIPELINE_INITIAL_STAGE_ID if task_type == "research" else Config.HUBSPOT_GRANT_SEEKER_PIPELINE_INITIAL_STAGE_ID
deal_lead_source = Config.ANBC_LEAD_SOURCE if is_anbc_form == 'true' else None
partial_success = {}
try:
contact_id = get_hubspot_contact(email, firstname, lastname)
except Exception as e:
return jsonify({
"error": "Failed to create or retrieve contact. ",
"details": str(e)
}), 500
try:
deal_id = create_hubspot_deal(title, deal_stage, deal_pipeline, deal_lead_source)
except Exception as e:
return jsonify({
"error": "Failed to create deal. ",
"contact_id": contact_id,
"details": str(e)
}), 500
try:
associate_hubspot_deal_with_contact(deal_id, contact_id)
except Exception as e:
return jsonify({
"error": "Failed to associate deal with contact. ",
"contact_id": contact_id,
"deal_id": deal_id,
"details": str(e)
}), 500
try:
# Create a note containing the form body and associate it to the contact and deal
note_id = create_hubspot_note(body, deal_id, contact_id)
except Exception as e:
# Don't fail the whole submission — just inform the user
partial_success = {
"warning": "Request successfully submitted, but note creation failed. ",
"contact_id": contact_id,
"deal_id": deal_id,
"details": str(e)
}
response = {
"message": "Request successfully submitted. ",
"status": "success",
"contact_id": contact_id,
"deal_id": deal_id,
"note_id": note_id
}
if email:
subject = 'SPARC Form Submission Confirmation'
email_body = ''
if is_service_form == 'true':
email_body = service_form_submission_request_confirmation_email.substitute({'name': firstname, 'message': body})
else:
email_body = anbc_form_creation_request_confirmation_email.substitute({'name': firstname, 'message': body}) if is_anbc_form == 'true' else creation_request_confirmation_email.substitute({'name': firstname, 'message': body})
html_body = markdown.markdown(email_body)
try:
email_sender.mailersend_email(Config.SES_SENDER, email, subject, html_body)
response['message'] = response.get('message', '') + 'Confirmation email sent to user successfully. '
if partial_success:
partial_success['warning'] = partial_success.get('warning', '') + 'Confirmation email sent to user successfully. '
except Exception as e:
if partial_success:
partial_success['warning'] = partial_success.get('warning', '') + 'Confirmation email sent to user unsuccessful. '
partial_success['details'] = partial_success.get('details', '') + str(e)
else:
partial_success = {
"warning": "Request successfully submitted, but confirmation email sent to user unsuccessful.",
"contact_id": contact_id,
"deal_id": deal_id,
"details": str(e)
}
if partial_success:
response.update(partial_success)
return jsonify(response), 207
return jsonify(response), 201
@app.route("/tasks", methods=["POST"])
def report_form_submission():
form = request.form
if "captcha_token" in form:
try:
captchaReq = requests.post(
url=Config.TURNSTILE_URL,
json={
"secret": Config.NUXT_TURNSTILE_SECRET_KEY,
"response": form["captcha_token"]
}
)
captchaResp = captchaReq.json()
if "success" not in captchaResp or not captchaResp["success"]:
return {"error": "Failed Captcha Validation"}, 409
except Exception as ex:
logging.error("Could not validate captcha, bypassing validation", ex)
elif not app.config['TESTING']:
return {"error": "Failed Captcha Validation"}, 409
# Captcha all good
has_attachment = False
image_id = uuid.uuid4()
response = None
if 'attachment' in request.files:
has_attachment = True
file = request.files["attachment"]
drive_client = init_drive_client()
response = upload_file(drive_client, file, str(image_id) + get_extension(file.filename))
client = init_gspread_client()
description = form["description"]
if has_attachment and response and response['webViewLink']:
description += "\n\nAttachment: " + response['webViewLink']
if append_contact(client, [form["title"], None, None, None, None, None, description]):
# Send a confirmation email to the user
if 'userEmail' in form and form['userEmail']:
user_email = form['userEmail']
name = form["firstName"] or user_email
subject = 'SPARC Submission'
body = creation_request_confirmation_email.substitute({
'name': name,
'message': description
})
task_type = ''
if form and 'type' in form:
task_type = form["type"]
if task_type == "news":
subject = 'SPARC News Submission'
elif task_type == "event":
subject = 'SPARC Event Submission'
elif task_type == "toolsAndResources":
subject = 'SPARC Tool/Resource Submission'
elif task_type == "communitySpotlight":
subject = 'SPARC Story Submission'
if len(user_email) > 0 and subject and body:
email_sender.mailersend_email(Config.SES_SENDER, user_email, subject, body.replace('\n', '
'))
return {'attachment_filename': image_id if has_attachment else ''}, 200
return {"error": "Failed registering user data"}, 500
@app.route("/hubspot_contact_properties/", methods=["GET"])
def get_hubspot_contact_properties(email):
url = f"{Config.HUBSPOT_V3_API}/objects/contacts/{email}?archived=false&idProperty=email&properties=firstname,lastname,email,newsletter,event_name"
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + Config.HUBSPOT_API_TOKEN
}
try:
response = requests.get(url, headers=headers)
# Handle successful responses (2xx)
if response.status_code == 200:
return jsonify(response.json())
# Handle not found (404)
elif response.status_code == 404:
return jsonify({
"error": "Contact not found",
"message": f"No contact with the email '{email}' was found in HubSpot."
}), 404
# Handle other non-success status codes
else:
return jsonify({
"error": "Failed to fetch contact",
"message": f"HubSpot API responded with status code {response.status_code}.",
"details": response.json() if response.headers.get("Content-Type") == "application/json" else response.text
}), response.status_code
except requests.RequestException as ex:
# Handle exceptions raised by the requests library
return jsonify({
"error": "RequestException",
"message": f"Could not get contact with email '{email}' due to a request error.",
"details": str(ex)
}), 500
except Exception as ex:
# Handle other unexpected exceptions
return jsonify({
"error": "Internal Server Error",
"message": f"An unexpected error occurred while fetching the contact with email '{email}'.",
"details": str(ex)
}), 500
@app.route("/subscribe_to_newsletter", methods=["POST"])
def subscribe_to_newsletter():
data = request.json
email = data.get('email_address')
first_name = data.get('first_name')
last_name = data.get('last_name')
# Ensure the required `email` field is present
if not email:
return jsonify({'error': 'Email is required'}), 400
newsletter_property = ''
try:
contact_properties, status_code = get_hubspot_contact_properties(email)
if status_code == 200:
newsletter_property = contact_properties['properties'].get('newsletter', None)
else:
logging.error(f"Unexpected response from HubSpot: {contact_properties}")
raise Exception(f"Unexpected error: {contact_properties}")
except Exception as e:
logging.error(f"Error while retrieving contact properties for email {email}: {e}")
current_newsletter_values = []
if isinstance(newsletter_property, str):
current_newsletter_values = newsletter_property.split(';')
# remove possible empty string
current_newsletter_values = list(filter(None, current_newsletter_values))
# Append the Newsletter value if it's not already in the array
if 'Newsletter' not in current_newsletter_values:
current_newsletter_values.append('Newsletter')
payload = {
"inputs": [
{
"properties": {
"email": email,
"firstname": first_name,
"lastname": last_name,
"newsletter": ';'.join(current_newsletter_values)
},
"id": email,
"idProperty": "email"
}
]
}
url = f"{Config.HUBSPOT_V3_API}/objects/contacts/batch/upsert"
headers = {
"Content-Type": "application/json",
'Authorization': 'Bearer ' + Config.HUBSPOT_API_TOKEN
}
# Send request to HubSpot API
try:
response = requests.post(url, json=payload, headers=headers)
response.raise_for_status() # Raise an HTTPError for bad responses (4xx or 5xx)
return jsonify(response.json()), 200
except requests.exceptions.RequestException as e:
return jsonify({'error': str(e)}), 500
def get_contact_properties(object_id):
client = hubspot.Client.create(access_token=Config.HUBSPOT_API_TOKEN)
try:
contact_data = client.crm.contacts.basic_api.get_by_id(contact_id=str(object_id), properties_with_history=["firstname", "lastname", "email", "newsletter", "event_name"],
archived=False)
except ApiException as e:
return abort(400, description=f"Exception thrown when getting contact properties: {e}")
if not contact_data:
return abort(400, description="Failed to retrieve contact data from HubSpot.")
if not contact_data.properties_with_history:
return abort(400, description="Contact properties not found")
if not contact_data.properties_with_history.get("email"):
return abort(400, description="Contact Email property not found")
email = contact_data.properties_with_history.get("email")[0].value
firstname_data = contact_data.properties_with_history.get("firstname", [{}])[0]
firstname = firstname_data.value if firstname_data else ""
lastname_data = contact_data.properties_with_history.get("lastname", [{}])[0]
lastname = lastname_data.value if lastname_data else ""
# The newsletter array contains tags where each one corresponds to a mailing list in EmailOctopus that a user can opt-in/out of
newsletter_tags_data = contact_data.properties_with_history.get("newsletter")
if len(newsletter_tags_data) > 0:
newsletter_tags_data = newsletter_tags_data[0]
newsletter_tags = newsletter_tags_data.value.split(";") if newsletter_tags_data else []
# The events array contains tags where each one corresponds to a mailing list in EmailOctopus that a user cannot opt-in/out of
events_tags_data = contact_data.properties_with_history.get("event_name")
if len(events_tags_data) > 0:
events_tags_data = events_tags_data[0]
events_tags = events_tags_data.value.split(";") if events_tags_data else []
# Filter out empty strings from the combined list
tags = [tag for tag in (newsletter_tags + events_tags) if tag]
return {
'email': email,
'firstname': firstname,
'lastname': lastname,
'tags': tags
}
def add_or_update_emailoctopus_contact(list_id, email, firstname, lastname, tags, status):
url = f"https://api.emailoctopus.com/lists/{list_id}/contacts"
headers = {
"Content-Type": "application/json",
'Authorization': 'Bearer ' + Config.EMAIL_OCTOPUS_API_KEY
}
payload = {
"email_address": email,
"fields": {"FirstName": firstname, "LastName": lastname},
"status": status,
"tags": tags
}
try:
response = requests.put(url, json=payload, headers=headers)
if str(response.status_code) != '200':
logging.error(f'Emailoctopus contact did not get added/updated for email: {email}. Returned a response of {response.status_code}: {response.text}')
return response.json()
except Exception as ex:
logging.error(f"Could not add or update contact with email address: {email} in emailoctopus list: {list_id}", ex)
return abort(500, description=f"Could not add/update contact with email address: {email} from emailoctopus list with ID: {list_id} due to the following error: {ex}")
@app.route("/hubspot_webhook", methods=["POST"])
def hubspot_webhook():
body = None
try:
body = request.get_json(force=True)
except Exception as e:
logging.error(f"Invalid JSON body: {e}")
return jsonify({"error": "Invalid JSON format"}), 400
if not isinstance(body, list) or not body:
logging.error(f'Expected an array of webhook events: {body}')
return jsonify({"error": "Expected a non-empty JSON array"}), 400
app.logger.info(f'Received Hubspot webhook request: {request}')
app.logger.info(f'Hubspot webhook request body: {body}')
if 'X-HubSpot-Request-Timestamp' not in request.headers or 'X-HubSpot-Signature-V3' not in request.headers:
logging.error(f'Required signature header(s) not present in the following request headers: {request.headers}')
return jsonify({"error": f"Required signature header(s) not present in the following request headers: {request.headers}"}), 400
signature_header = request.headers.get("X-HubSpot-Signature-V3")
timestamp_header = request.headers["X-HubSpot-Request-Timestamp"]
try:
signature_timestamp = int(timestamp_header)
except ValueError:
logging.error(f'Invalid signature timestamp format: {timestamp_header}')
return jsonify({"error": "Invalid signature timestamp format"}), 400
try:
current_time = int(time.time())
if current_time - signature_timestamp > 300:
logging.error(f'Signature timestamp is older than 5 minutes: current time = {current_time}, signature time = {signature_timestamp}')
return jsonify({'error': 'Signature timestamp is older than 5 minutes'}), 400
# Concatenate request method, URI, body, and header timestamp
url = request.url
method = 'POST'
stringified_body = json.dumps(body, separators=(",", ":"))
raw_string = f"{method}{url}{stringified_body}{timestamp_header}"
# Create HMAC SHA-256 hash from the raw string, then base64-encode it
hashed_signature = hmac.new(
Config.HUBSPOT_CLIENT_SECRET.encode('utf-8'),
raw_string.encode('utf-8'),
hashlib.sha256
).digest()
base64_hashed_signature = base64.b64encode(hashed_signature).decode('utf-8')
# Validate the signature if we are not running a test
if not hmac.compare_digest(base64_hashed_signature, signature_header):
logging.error(f'Signature is invalid')
return jsonify({"error": "Signature is invalid"}), 401
except Exception as ex:
logging.error(f'Internal error when validating Hubspot webhook request signature: {ex}')
return jsonify({"error": f"Internal error when validating Hubspot webhook request signature: {ex}"}), 500
# execute this in a separate thread so that we can send the acknowledgement response to HubSpot asap and do not block the api server
def process_event(event):
with app.app_context():
subscription_type = event.get("subscriptionType")
object_id = event.get("objectId")
if subscription_type is None or object_id is None:
logging.error(f"Missing required keys in event: {event}")
return
contact_data = None
try:
# HubSpot only provides the contact id so we have to request the contact details separately
contact_data = get_contact_properties(object_id)
except Exception as ex:
logging.error(f'Could not retrieve contact information for ID: {object_id} due to the following error: {ex}')
return
try:
firstname = contact_data["firstname"]
lastname = contact_data["lastname"]
email = contact_data["email"]
emailoctopus_contact = add_or_update_emailoctopus_contact(Config.EMAIL_OCTOPUS_MASTER_LIST_ID, email, firstname, lastname, [], 'subscribed')
if subscription_type == "contact.propertyChange":
tags_to_add = []
for tag in contact_data["tags"]:
if tag not in emailoctopus_contact["tags"]:
tags_to_add.append(tag)
# Now we must cycle through all the tags in order to see if any must be removed since we don't know what tags were added or removed in hubspot
tags_to_remove = []
for tag in emailoctopus_contact["tags"]:
if tag not in contact_data["tags"]:
tags_to_remove.append(tag)
updated_contact_tags = {tag: True for tag in tags_to_add}
updated_contact_tags.update({tag: False for tag in tags_to_remove})
add_or_update_emailoctopus_contact(Config.EMAIL_OCTOPUS_MASTER_LIST_ID, email, firstname, lastname, updated_contact_tags, 'subscribed')
else:
logging.error(f'Unsupported subscription type: {subscription_type}')
except Exception as ex:
logging.error(f"Error processing event {event}: {ex}")
for event in body:
if not isinstance(event, dict):
logging.warning(f"Skipping non-dict event: {event}")
continue
executor.submit(process_event, event)
return jsonify({"status": "success", "message": "Webhook request received and signature verified"}), 200
# Get list of available name / curie pair
@app.route("/get-organ-curies/")
def get_available_uberonids():
species = request.args.getlist('species')
requestBody = create_request_body_for_curies(species)
result = {}
try:
response = requests.post(
f'{Config.SCI_CRUNCH_HOST}/_search?api_key={Config.KNOWLEDGEBASE_KEY}',
json=requestBody)
result = reform_curies_results(response.json())
except BaseException as ex:
logging.error("Failed getting Uberon IDs", ex)
return {
"message": "Could not parse SciCrunch output, please try again later",
"error": "BaseException"
}, 502
return jsonify(result)
# Get list of terms a level up/down from
@app.route("/get-related-terms/")
def get_related_terms(query):
payload = {
'direction': request.args.get('direction', default='OUTGOING'),
'relationshipType': request.args.get('relationshipType', default='BFO:0000050'),
'entail': request.args.get('entail', default='true'),
'api_key': Config.KNOWLEDGEBASE_KEY
}
result = {}
try:
response = requests.get(
f'{Config.SCI_CRUNCH_SCIGRAPH_HOST}/graph/neighbors/{query}',
params=payload)
result = reform_related_terms(response.json())
except BaseException as ex:
logging.error(f"Failed getting related terms with payload {payload}", ex)
return {
"message": "Could not parse SciCrunch output, please try again later",
"error": "BaseException"
}, 502
return jsonify(result)
@app.route("/simulation_ui_file/")
def simulation_ui_file(identifier):
results = process_results(dataset_search(create_pennsieve_identifier_query(identifier)))
results_json = json.loads(results.data)
try:
item = results_json["results"][0]
uri = item["s3uri"]
path = item["abi-simulation-file"][0]["dataset"]["path"]
key = re.sub(r"s3://[^/]*/", "", f"{uri}files/{path}")
s3_bucket_name = re.sub(r"s3://|/.*", "", uri)
return jsonify(json.loads(direct_download_url(key, s3_bucket_name)))
except Exception:
abort(404, description="no simulation UI file could be found")
@app.route("/pmr_file", methods=["POST"])
def pmr_file():
data = request.get_json()
if data and "path" in data:
try:
resp = requests.post(f"{Config.PMR_HOST}/{data['path']}")
if resp.status_code == 200:
return base64.b64encode(resp.content)
else:
return resp.json()
except:
abort(400, description="invalid path")
else:
abort(400, description="missing path")
@app.route("/start_simulation", methods=["POST"])
def start_simulation():
data = request.get_json()
if data and "solver" in data and "name" in data["solver"] and "version" in data["solver"]:
return json.dumps(do_start_simulation(data))
else:
abort(400, description="Missing solver name and/or solver version")
@app.route("/check_simulation", methods=["POST"])
def check_simulation():
data = request.get_json()
if data and "job_id" in data and "solver" in data and "name" in data["solver"] and "version" in data["solver"]:
return json.dumps(do_check_simulation(data))
else:
abort(400, description="Missing solver name, solver version and/or job id")
@app.route("/pmr_latest_exposure", methods=["POST"])
def pmr_latest_exposure():
data = request.get_json()
if data and "workspace_url" in data:
try:
resp = requests.get(data["workspace_url"],
headers={"Accept": "application/vnd.physiome.pmr2.json.1"})
if resp.status_code == 200:
try:
# Return the latest exposure for the given workspace.
url = resp.json()["collection"]["items"][0]["links"][0]["href"]
except:
# There is no latest exposure for the given workspace.
url = ""
return jsonify(
url=url
)
else:
return resp.json()
except:
abort(400, description="Invalid workspace URL")
else:
abort(400, description="Missing workspace URL")
@app.route("/onto_term_lookup")
def find_by_onto_term():
term = request.args.get('term')
headers = {
'Accept': 'application/json',
}
params = {
"api_key": Config.KNOWLEDGEBASE_KEY
}
query = create_onto_term_query(term)
try:
response = requests.get(f'{Config.SCI_CRUNCH_INTERLEX_HOST}/_search', headers=headers, params=params, json=query)
results = response.json()
hits = results['hits']['hits']
total = results['hits']['total']
if total == 1:
result = hits[0]
json_data = result['_source']
else:
json_data = {'label': 'not found'}
return json_data
except Exception as ex:
logging.error("An error occured while fetching from SciCrunch", ex)
return abort(500)
@app.route("/dataset_citations/", methods=["GET"])
def get_dataset_citations(dataset_id):
headers = {
'Accept': 'application/json',
}
params = {
"api_key": Config.KNOWLEDGEBASE_KEY
}
query = create_citations_query(dataset_id)
try:
response = requests.get(f'{Config.SCI_CRUNCH_CITATIONS_HOST}/_search', headers=headers, params=params, json=query)
results = response.json()
hits = results['hits']['hits']
total = results['hits']['total']['value']
if total == 1:
result = hits[0]
json_data = result['_source']
else:
json_data = {'dataset id': 'not found'}
return json_data
except Exception as ex:
logging.error("An error occured while fetching from SciCrunch", ex)
return jsonify({ 'message': f"An error occured while fetching citation info for dataset {dataset_id} from SciCrunch" }), 500
@app.route("/total_dataset_citations", methods=["GET"])
def get_total_dataset_citations():
headers = {
'Accept': 'application/json',
}
params = {
"api_key": Config.KNOWLEDGEBASE_KEY
}
query = {
"size": 0,
"from": 0,
"query": { "match_all": {} },
"aggregations": {
"Citations": {
"terms": {
"field": "citations.type"
}
}
}
}
try:
response = requests.get(f'{Config.SCI_CRUNCH_CITATIONS_HOST}/_search', headers=headers, params=params, json=query)
results = response.json()
buckets = results['aggregations']['Citations']['buckets']
total = sum(bucket["doc_count"] for bucket in buckets)
return jsonify({ 'total_citations': total }), 200
except Exception as ex:
logging.error("An error occured while fetching total citations from SciCrunch", ex)
return jsonify({ 'total_citations': -1, 'message': "An error occured while fetching total citations from SciCrunch" }), 500
@app.route("/search-readme/", methods=["GET"])
def search_readme(query):
url = 'https://dash.readme.com/api/v1/docs/search?search=' + query
headers = {'Authorization': 'Basic ' + Config.README_API_KEY}
try:
response = requests.post(
url=url,
headers=headers
)
return response.json()
except requests.exceptions.HTTPError as err:
logging.error(err)
return {
"error": str(err),
"message": "Readme is not currently reachable, please try again later"
}, 502
@app.route("/metrics", methods=["GET"])
def metrics():
return usage_metrics
# Callback endpoint for contentful event updated webhook that gets triggered when an event is updated in Contentful
@app.route("/event_updated", methods=["POST"])
def event_updated():
# the webhook secret key is configured with the same value as the cda access token. If the cda access token is updated then we must update this as well.
secret_key = request.headers.get('event_updated_secret_key')
if secret_key != Config.CTF_CDA_ACCESS_TOKEN:
abort(403, description=f'Invalid secret key: {secret_key}')
else:
event = request.get_json()
if event:
try:
return update_event_sort_order(event)
except:
abort(400, description=f'Invalid event data: {event}')
else:
abort(400, description="Missing event data")
@cache.cached(timeout=86400)
@app.route("/all_dataset_ids", methods=["GET"])
def all_dataset_ids():
list = get_all_dataset_ids()
string_list = [str(element) for element in list]
delimiter = ", "
return delimiter.join(string_list)
@cache.cached(timeout=86400)
@app.route("/all_dataset_uuids", methods=["GET"])
def all_dataset_uuids():
list = get_all_dataset_uuids()
string_list = [str(element) for element in list]
delimiter = ", "
return delimiter.join(string_list)
@app.route("/total_protocol_views")
@cache.cached(timeout=180)
def get_total_protocol_views():
table_state = get_protocol_metrics_table_state(protocolMetricsTable)
if table_state is None or not table_state.get("total_protocol_views"):
return jsonify({
"total_views": None,
"message": "Total views not yet calculated."
}), 202
total_protocol_views = table_state.get("total_protocol_views")
return jsonify({"total_views": total_protocol_views}), 200
@app.route("/contact_support", methods=["POST"])
def contact_support():
data = request.get_json()
name = data.get("name")
email = data.get("email")
message = data.get("message")
subject = data.get("subject", "SPARC Form Submission Confirmation")
if not name or not email or not message:
return jsonify({"error": "Missing required fields"}), 400
if email:
try:
email_sender.mailersend_email(Config.SES_SENDER,
email,
subject,
feedback_email.substitute({'message': message}),
Config.SERVICES_EMAIL)
except Exception as e:
return jsonify({"message": "Confirmation email sent to user unsuccessful."}), 500
return jsonify({"message": "Message received successfully."}), 200