import requests
import json
# Configuration
BASE_URL = "https://your-instance.infactory.ai/api/infactory"
AUTH_TOKEN = "your-auth-token"
PROJECT_ID = "your-project-id"
TEAM_ID = "your-team-id"
ORG_ID = "your-org-id"
headers = {
"Content-Type": "application/json",
"Authorization": AUTH_TOKEN
}
# Step 1: Test the connection
test_payload = {
"url": "https://api-endpoint.example.com/data",
"method": "GET",
"parameters": {
"key": {
"value": "your-api-key-value",
"required": True
}
},
"parameterGroups": [],
"authType": "None",
"auth": {},
"responsePathExtractor": "value"
}
test_response = requests.post(
f"{BASE_URL}/v1/http/test-connection",
headers=headers,
data=json.dumps(test_payload)
)
if test_response.status_code != 200 or not test_response.json()["success"]:
print("Connection test failed:", test_response.text)
exit(1)
print("Connection test successful!")
# Step 2: Create a datasource
datasource_payload = {
"name": "My HTTP Connection",
"project_id": PROJECT_ID,
"type": "http-requests",
"status": "transformation_started"
}
datasource_response = requests.post(
f"{BASE_URL}/v1/datasources",
headers=headers,
data=json.dumps(datasource_payload)
)
if datasource_response.status_code != 200:
print("Failed to create datasource:", datasource_response.text)
exit(1)
datasource_id = datasource_response.json()["id"]
print(f"Created datasource with ID: {datasource_id}")
# Step 3: Create credentials
credentials_payload = {
"name": "API Credentials",
"type": "api",
"description": "Credentials for API connection",
"metadata": {
"url": "https://api-endpoint.example.com/data",
"method": "GET",
"headers": {},
"auth": {}
},
"datasource_id": datasource_id,
"team_id": TEAM_ID,
"organization_id": ORG_ID,
"config": {
"url": "https://api-endpoint.example.com/data",
"method": "GET",
"headers": {},
"auth": {}
}
}
credentials_response = requests.post(
f"{BASE_URL}/v1/credentials",
headers=headers,
data=json.dumps(credentials_payload)
)
if credentials_response.status_code != 200:
print("Failed to create credentials:", credentials_response.text)
exit(1)
else:
print("Created credentials successfully")
# Step 4: Execute the HTTP request
execute_payload = {
**test_payload,
"project_id": PROJECT_ID,
"datasource_id": datasource_id,
"connect_spec": {
"name": "My HTTP Connection",
"id": "http-requests",
"config": {
**test_payload,
"responsePathExtractor": "value"
}
}
}
execute_response = requests.post(
f"{BASE_URL}/v1/http/execute-request",
headers=headers,
data=json.dumps(execute_payload)
)
if execute_response.status_code != 200:
print("Failed to execute HTTP request:", execute_response.text)
exit(1)
result = execute_response.json()
print(f"Successfully executed HTTP request!")
print(f"Data object ID: {result['data_object_id']}")
print(f"Jobs: {len(result['jobs'])} jobs created")