import pytest
from app import app
from app.config import Config
import requests
import random
import string
import hmac
import hashlib
import base64
import time
import json
from requests.auth import HTTPBasicAuth
@pytest.fixture
def client():
# Spin up test flask app
app.config['TESTING'] = True
return app.test_client()
def test_direct_download_url_small_file(client):
small_s3_file = '217/files/derivative/brainstem_pig_metadata.json'
r = client.get(f"/s3-resource/{small_s3_file}?s3BucketName=prd-sparc-discover50-use1")
assert r.status_code == 200
assert b"medulla" in r.data
def test_direct_download_url_new_bucket_file(client):
new_s3_file = '307%2Ffiles%2Fderivative%2Fhuman_body_metadata.json'
r = client.get(f"/s3-resource/{new_s3_file}?s3BucketName=prd-sparc-discover50-use1")
assert r.status_code == 200
assert b"colon" in r.data
def test_direct_download_url_thumbnail(client):
small_s3_file = '95/files/derivative%2FcolonHuman_Layout1_thumbnail.jpeg'
r = client.get(f"/s3-resource/{small_s3_file}?s3BucketName=prd-sparc-discover50-use1")
assert r.status_code == 200
assert b"\xFF\xD8\xFF" in r.data
def test_direct_download_incorrect_path(client):
incorrect_path = '95/files/?encodeBase64=true'
r = client.get(f"/s3-resource/{incorrect_path}?s3BucketName=prd-sparc-discover50-use1")
assert r.status_code != 200
def test_direct_download_empty_path(client):
r = client.get(f"/s3-resource/")
assert r.status_code == 404
def test_direct_download_url_large_file(client):
large_s3_file = '61%2Ffiles%2Fprimary%2Fsub-44%2Fsam-1%2Fmicroscopy%2Fsub-44sam-1C44-1Slide2p2MT_10x.nd2'
r = client.get(f"/s3-resource/{large_s3_file}")
assert r.status_code == 413
assert 'File too big to download' in r.get_data().decode()
def test_get_owner_email(client):
# SPARC Portal user info
portal_user_id = 729
portal_user_email = 'nih-data-portal@blackfynn.com'
r = client.get(f"/get_owner_email/{portal_user_id}")
assert r.status_code == 200
assert r.get_json()['email'] == portal_user_email
r = client.get(f"/get_owner_email/{999999}")
assert r.status_code == 404
def test_get_datasets_by_project(client):
# SPARC Portal project info
portal_project_id = 'OT2OD025340'
r = client.get(f"/project/{999999}")
assert r.status_code == 404
r = client.get(f"/project/{portal_project_id}")
assert r.status_code == 200
def test_map_get_share_id_and_state(client):
# mock json for testing
test_data = { "state" : { "type" : "scaffold", "value": 1234 } }
r = client.post(f"/map/getshareid", json = {})
assert r.status_code == 400
r = client.post(f"/map/getshareid", json = test_data)
assert r.status_code == 200
assert "uuid" in r.get_json()
r = client.post(f"/map/getstate", json = r.get_json())
assert r.status_code == 200
returned_data = r.get_json()
assert "state" in returned_data
assert returned_data["state"]["type"] == "scaffold"
assert returned_data["state"]["value"] == 1234
r = client.post(f"/map/getstate", json = {"uuid": "1234567"})
assert r.status_code == 400
r = client.post(f"/map/getstate", json = {})
assert r.status_code == 400
def test_scaffold_get_share_id_and_state(client):
# mock json for testing
testData = { "state" : { "far" : 12.32, "near": 0.23 } }
r = client.post(f"/scaffold/getshareid", json = {})
assert r.status_code == 400
r = client.post(f"/scaffold/getshareid", json = testData)
assert r.status_code == 200
assert "uuid" in r.get_json()
r = client.post(f"/scaffold/getstate", json = r.get_json())
assert r.status_code == 200
returned_data = r.get_json()
assert "state" in returned_data
assert returned_data["state"]["far"] == 12.32
assert returned_data["state"]["near"] == 0.23
r = client.post(f"/scaffold/getstate", json = {"uuid": "1234567"})
assert r.status_code == 400
r = client.post(f"/scaffold/getstate", json = {})
assert r.status_code == 400
def test_create_wrike_task(client):
r = client.post(f"/tasks", data={"title": "test-integration-task-sparc-api"})
assert r.status_code == 400
r2 = client.post(f"/tasks", data={"description": "test-integration-task-sparc-api
Here is a small text but not lorem ipsum"})
assert r2.status_code == 400
r3 = client.post(f"/tasks", data={"title": "test-integration-task-sparc-api", "description": "test-integration-task-sparc-api
Here is a small text but not lorem ipsum"})
assert r3.status_code == 200
# this part is only for cleaning the wrike board
returned_data = r3.get_json()
task_id = returned_data["task_id"]
url = 'https://www.wrike.com/api/v4/tasks/{}'.format(task_id)
hed = {'Authorization': 'Bearer ' + Config.WRIKE_TOKEN}
resp = requests.delete(
url=url,
headers=hed
)
assert resp.status_code == 200
def test_hubspot_webhook(client):
http_method = "POST"
endpoint = "/hubspot_webhook"
base_url = "http://localhost" # Default for Flask test client
full_url = f"{base_url}{endpoint}"
# mock a property changed event firing for test Hubspot contact
mock_body = '[{"subscriptionType":"contact.propertyChange","objectId":"83944215465"}]'
# The timestamp must be a Unix epoch time within 5 minutes (300 seconds) of the current time when the webhook request is received.
valid_timestamp = int(time.time())
# Concatenate the string as HubSpot does
data_to_sign = f'{http_method}{full_url}{mock_body}{valid_timestamp}'
# Generate the HMAC SHA256 signature
signature = hmac.new(
key=Config.HUBSPOT_CLIENT_SECRET.encode('utf-8'),
msg=data_to_sign.encode('utf-8'),
digestmod=hashlib.sha256
).digest()
# Encode the signature in Base64
mock_signature = base64.b64encode(signature).decode()
# Send a mock POST request
response = client.post(
endpoint,
json=mock_body,
headers={
"Content-Type": "application/json",
"X-HubSpot-Signature-Version": "v3",
"X-Hubspot-Signature-v3": mock_signature,
"X-HubSpot-Request-Timestamp": str(valid_timestamp),
}
)
assert response.status_code == 204
def test_subscribe_to_mailchimp(client):
r = client.post(f"/mailchimp_subscribe", json={})
assert r.status_code == 400
letters = string.ascii_lowercase
email = ''.join(random.choice(letters) for i in range(8))
domain = ''.join(random.choice(letters) for i in range(6))
email_address = '{}@{}.com'.format(email, domain)
r2 = client.post(f"/mailchimp_subscribe", json={"email_address": email_address, "first_name": "Test", "last_name": "User"})
assert r2.status_code == 200
# this part is only for cleaning the mailchimp list and not pollute the mailing list
returned_data = r2.get_json()
member_hash = returned_data["id"]
url = 'https://us2.api.mailchimp.com/3.0/lists/c81a347bd8/members/{}/actions/delete-permanent'.format(member_hash)
auth = HTTPBasicAuth('AnyUser', Config.MAILCHIMP_API_KEY)
resp = requests.post(
url=url,
auth=auth
)
assert resp.status_code == 204
def test_osparc_viewers(client):
r = client.get('/get_osparc_data')
assert r.status_code == 200
osparc_data = r.get_json()
assert 'file_viewers' in osparc_data
def test_sim_dataset(client):
r = client.get('/sim/dataset/0')
assert r.status_code == 404
def test_onto_term_lookup(client):
r = client.get('/onto_term_lookup', query_string={'term': 'http://purl.obolibrary.org/obo/NCBITaxon_9606'})
assert r.status_code == 200
json_data = r.get_json()
assert json_data['label'] == 'Human'
def test_non_existing_simulation_ui_file(client):
r = client.get('/simulation_ui_file/137')
assert r.status_code == 404
def test_simulation_ui_file_old_s3_bucket(client):
r = client.get('/simulation_ui_file/135')
assert r.status_code == 200
assert r.get_json()['simulation']['solvers'][0]['name'] == 'simcore/services/comp/opencor'
def test_simulation_ui_file_new_s3_bucket(client):
r = client.get('/simulation_ui_file/308')
assert r.status_code == 200
assert r.get_json()['simulation']['solvers'][0]['name'] == 'simcore/services/comp/kember-cardiac-model'
def test_get_featured_datasets(client):
r = client.get('/get_featured_datasets_identifiers')
assert r.status_code == 200
json = r.get_json()
assert 'identifiers' in json
assert type(json['identifiers']) == list