Generic Airflow Transfers made easy

Jarek Potiuk
Apache Airflow
Published in
5 min readJul 16, 2022

If you’ve been using Airflow for quite a while, you probably got used to writing your DAGs using Operators as basic building blocks of your DAGs. This is what you’ve been told by various tutorials and a myriad of examples you could find, and this works pretty well. However you likely struggled on how to use the simple operators to implement the “Transfer” type of tasks — for example when you wanted to transfer S3 file to GCS and do some simple transformation along the way.

Generic Airflow Transfer

If “pretty well” is not “good enough” for you, Airflow 2 has a far bigger toolset for writing your DAGs and you can start thinking differently about writing your DAGs to better express your intents.

Airflow Operators are really cool if you have one thing to do and one system to interface with. And they do a fantastic job there. However when your intent is to get a task that involves more than one external system, the Operators seem to get in the way rather than help. A good example are Transfer Operators — when you wanted to transform operations between a source and destination.

Since the beginning of my Airflow involvement, it has always bothered me that in order to do the job, we had to create a dedicated operator for EACH combination of source and destination — for example we have GCSToSFTPOperator, GCSToHiveOperator, GCSToS3Operator — and also S3ToSFTPOperator, S3ToHiveOperator, S3ToGCSOperator … and so on. This sounds rather strange and with the number of providers (i.e. integration with external systems) growing, creating Transfer Operators to combine all sources and destinations sounds like an O(n²) task (if you are familiar with big-O notation). And if you want to do even simple transformation along the way, you are pretty much lost.

There were various approaches you could take here:

  • Use two operators as two steps in DAG. That sounds pretty “natural” in the Airflow workflow. At the end — most integrations already have some “download” and “upload” operators and it should be easy just to download in one task and upload in the other. And if you are using Local Executor, this might actually work — all tasks in Local Executor work on the same machine, so you can easily download a file to “/tmp” directory in one operator and upload it in another. However this solution does not really work if you have more scalable deployment — for Celery/Kubernetes executor each tasks runs on potentially different machine (or in different Pod) and you cannot rely on the fact those tasks can easily share files.
  • Complete discussion and implement AIP-14 Airflow Improvement Proposal for composable operators. Unfortunately (or fortunately, depends how you look at it), it never gone past the initial discussion phase and we will likely move it to “abandoned” state, because adding another way to add composability of operators where we already compose them in DAGs does not seem right — seems like extra, unnecessary level of abstraction.
  • Use Generic Transfer Operator. The Generic Transfer operator in Airflow Core that implements just that — transfer — providing that “source” and “destination” hooks implement specific API. Much better approach, but the Generic Transfer Operator has limitations too. First — the Hooks have to follow certain API, but more importantly — it’s not possible to have a “streaming” transfer — you need to download data fully locally from source in order to send it to a destination. This is a huge limitation for any kind of transfer as not only it requires local storage or memory, but also it runs usually 2x slower than the equivalent streaming approach. You can do simple transformations there — but they require download operation to finish before it can be run on the full dataset once downloaded. And it is not a very popular approach as only a handful of operators implement the required API.
  • We could implement a “Generic Streaming Transfer” but that would require from any of the Hooks to implement a complex abstract API with callbacks and in-memory passing of data from one hook to another. And we would have to implement the API in many operators. Again — adding a new, completely disconnected abstraction layer on top of already existing abstraction layers for that, seems quite a bit superfluous.

Looks like we run out of options, right ? Not really.

There is a much better solution. In Airflow 2 we’ve added something that helps to solve the problem in much more natural way — by combining the Task Flow API and Hooks. We just did not realize back then that it solves the transfer problem in an unexpected way — by making using Hooks as easy to add to your DAGs as adding Operators.

Hooks are a little under-valued Airflow abstraction. Most users just know and care for the Operators, but for the vast majority of Airflow Operators, they are just thin wrappers over Hook’s methods. Not everyone realizes that in almost all cases the operator’s methods perform just initial validation and delegate the execution and logic of the operation they do to the underlying Hook. This is a pattern that has been consistently followed for years, and its use has not been used as much as it could be.

You could always use Hooks in PythonOperator for example but this is not a common knowledge or often followed practice. But the TaskFlow API introduced a way to run Python code much more easily in your tasks and with far less boilerplate. The Task Flow API made it easy enough that writing a Transfer Operator with it is as simple as adding two Operators to a DAG.

Check out this example:

Yes. It’s that simple.

This is a working example of S3 to GCS transfer that “just works”. And you are perfectly free to add all other stuff that is needed — you are not limited to what the “S3toGCSTransfer” operator has to offer. And it uses the very same connections that you already defined so that they can be used in your DAGs.

You are perfectly free to add compression, authentication parameters, process multiple files in a loop, perform any transformation with the data between the “download” and “upload” operations. And there is more — if you look at what the Hooks offer as an API, you are even able to implement reading the files in “chunks” from S3 and writing it to GCS in chunks — which might boost the speed of the transfer 2x and lower your temporary storage requirements. All that without adding an unnecessary level of abstraction.

And most importantly — it does not force you to change the way your DAGs are written in general. You can modernize only the parts that are best suited for that and do it incrementally, without “putting the world on its head” and total rewrite of your DAGs. As you can read in the Task Flow API Tutorial, you can easily combine “classic” operators with the “Task Flow” ones.

So if you are looking to do some transfer task in Airflow and you look for an operator that does the job — just write a few lines of code to achieve what you want.

--

--