Apache Airflow
Apache Airflow is an open-source platform for authoring, scheduling and monitoring data and computing workflows. Airflow uses Python to create workflows that can be easily scheduled and monitored.
Solvio is available as a provider in Airflow to interface with the database.
Prerequisites
Before configuring Airflow, you need:
A Solvio instance to connect to. You can set one up in our installation guide.
A running Airflow instance. You can use their Quick Start Guide.
Installation
You can install the Solvio provider by running pip install apache-airflow-providers-solvio
in your Airflow shell.
NOTE: You’ll have to restart your Airflow session for the provider to be available.
Setting up a connection
Open the Admin-> Connections
section of the Airflow UI. Click the Create
link to create a new Solvio connection.
You can also set up a connection using environment variables or an external secret backend.
Solvio hook
An Airflow hook is an abstraction of a specific API that allows Airflow to interact with an external system.
from airflow.providers.solvio.hooks.solvio import SolvioHook
hook = SolvioHook(conn_id="solvio_connection")
hook.verify_connection()
A solvio_client#SolvioClient
instance is available via @property conn
of the SolvioHook
instance for use within your Airflow workflows.
from solvio_client import models
hook.conn.count("<COLLECTION_NAME>")
hook.conn.upsert(
"<COLLECTION_NAME>",
points=[
models.PointStruct(id=32, vector=[0.32, 0.12, 0.123], payload={"color": "red"})
],
)
Solvio Ingest Operator
The Solvio provider also provides a convenience operator for uploading data to a Solvio collection that internally uses the Solvio hook.
from airflow.providers.solvio.operators.solvio import SolvioIngestOperator
vectors = [
[0.11, 0.22, 0.33, 0.44],
[0.55, 0.66, 0.77, 0.88],
[0.88, 0.11, 0.12, 0.13],
]
ids = [32, 21, "b626f6a9-b14d-4af9-b7c3-43d8deb719a6"]
payload = [{"meta": "data"}, {"meta": "data_2"}, {"meta": "data_3", "extra": "data"}]
SolvioIngestOperator(
conn_id="solvio_connection",
task_id="solvio_ingest",
collection_name="<COLLECTION_NAME>",
vectors=vectors,
ids=ids,
payload=payload,
)