This script allows Airflow to iterate with Salesforce. Currently are supported:
- Copy Salesforce Objects into csv on Amazon S3 based on a SOQL query,
- Insert, Update or Delete Salesforce Object Records based on a csv file on Amazon S3
Copy python file to Airflow Plugins folder. Your Airflow instance must have simple-salesforce
package installed.
The connection must have the following parameters:
Conn Type: HTTP
Host: Your Salesforce URL Instance
Login: Your Email Login
Password: Password
Extra: {"security_token": "your-security-token"}
The DAG example bellow dumps all current Id records from Salesforce Account Object:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.salesforce_utils import SalesforceToS3Operator
dag = DAG(
dag_id="dump_account",
schedule_interval=timedelta(days=1),
start_date=datetime(2018, 12, 11, 17, 0),
)
ds = "{{ ds }}"
with dag:
unload_account = SalesforceToS3Operator(
task_id="unload_account",
sql="select Id from Account",
dest_key=f"airflow/unload_account_{ds}",
dest_bucket="aws",
salesforce_conn_id="salesforce",
aws_conn_id="aws",
include_deleted=False,
)
unload_account