import requests
import os
import io
from datetime import datetime
from azure.storage.blob import ContainerClient, ContentSettings
# workspace = {'name': '', 'apiKey': ''}
# config ={'tenantkey': ''}
workspace_key = ''
tenant_key = ''
workspace_name = ''
def getHeaders():
return {
'Api-Key': workspace_key,
'Ocp-Apim-Subscription-Key': tenant_key
}
# return {
# 'Api-Key': workspace['apiKey'],
# 'Ocp-Apim-Subscription-Key': config['tenantkey']
# }
def _make_request(method, endpoint, data=None):
url = f"{BASE_URL}{endpoint}"
response = requests.request(method, url, headers=getHeaders(), json=data)
response.raise_for_status() # Raise exception for error codes
return response
def test_connection():
"""Tests basic connection to the API"""
return _make_request('GET', '/api/ping')
def list_workspace_containers(workspace_name):
"""Gets upload containers for a workspace"""
endpoint = f"/api/workspace/{workspace_name}/files/containers"
return _make_request('GET', endpoint)
def create_workspace_container(workspace_name, title=None):
"""Creates an upload container for a specified workspace.
Args:
workspace_name: The name of the workspace (e.g., 'dws-1-ACRONYM').
title: An optional title for the container.
Returns:
str: The location of the created container (from the response header).
"""
timestamp = f'{datetime.now():%Y-%m-%d %H:%M:%S}'
title = timestamp + ' ' + title
endpoint = f"/api/workspace/{workspace_name}/files/containers"
url = f"{BASE_URL}{endpoint}"
# Add title as a query parameter if provided
params = {'title': title} if title else None
response = requests.post(url, headers=getHeaders(), params=params)
response.raise_for_status()
return response.headers['Location'] # Assuming location is in the header
def delete_workspace_container(workspace_name, container_location):
"""Deletes an upload container in the specified workspace.
Args:
workspace_name: The name of the workspace (e.g., 'dws-1-ACRONYM').
container_location: The location/identifier of the container.
"""
if not(container_location):
return f'Create container first'
# Extract the container identifier from the location
container_identifier = container_location.rsplit('/', 1)[-1]
endpoint = f"/api/workspace/{workspace_name}/files/containers/{container_identifier}"
url = f"{BASE_URL}{endpoint}"
response = requests.delete(url, headers=getHeaders())
response.raise_for_status() # Raise an exception for error codes
return response
def commit_workspace_container(workspace_name, container_location):
"""Commits changes to an upload container.
Args:
workspace_name: The name of the workspace.
container_identifier: The identifier of the container.
"""
# Extract the container identifier (adapt if necessary)
container_identifier = container_location.rsplit('/', 1)[-1]
endpoint = f"/api/workspace/{workspace_name}/files/containers/{container_identifier}"
url = f"{BASE_URL}{endpoint}"
response = requests.patch(url, headers=getHeaders())
response.raise_for_status() # Raise an exception for error codes
# You might get some useful data in the response upon a successful commit
return response
def upload_text_to_container(container, text_data, filename='my_text.txt'):
"""Uploads text data to a container using container URL.
Args:
container: An Azure Blob Storage URL.
text_data: The text content to be uploaded.
filename: The desired filename within the container.
"""
container_client = ContainerClient.from_container_url(container)
blob_client = container_client.get_blob_client(filename)
# Upload using the appropriate method on your container client
blob_client.upload_blob(
text_data.encode('utf-8'),
content_settings=ContentSettings(content_type='text/plain')
)
def upload_dataframe_to_azure(container, df, filename='data.csv'):
"""Uploads text data to a container using container URL.
Args:
container: An Azure Blob Storage URL.
df: pandas data frame
filename: The desired filename within the container.
"""
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
csv_data = csv_buffer.getvalue().encode('utf-8') # Encoding for Blob Storage
container_client = ContainerClient.from_container_url(container)
blob_client = container_client.get_blob_client(filename)
blob_client.upload_blob(csv_data, overwrite=True)
def upload_file_to_azure(container, local_file_path):
"""Uploads text data to a container using container URL.
Args:
container: An Azure Blob Storage URL.
df: pandas data frame
filename: The desired filename within the container.
"""
try:
# Get the file name from the local file path
file_name = os.path.basename(local_file_path)
# Create a ContainerClient using the SAS URI
container_client = ContainerClient.from_container_url(container)
# Upload the file to the container
with open(local_file_path, "rb") as file_to_upload:
result = container_client.upload_blob(file_name, file_to_upload, overwrite=True)
print(f"File '{file_name}' uploaded successfully. \n{result=} \nto {container=}")
except Exception as ex:
print(f"Exception: {ex}")