Working with Arrow Datasets and dplyr

Apache Arrow lets you work efficiently with large, multi-file datasets. The arrow R package provides a dplyr interface to Arrow Datasets, and other tools for interactive exploration of Arrow data.

This vignette introduces Datasets and shows how to use dplyr to analyze them.

Example: NYC taxi data

The New York City taxi trip record data is widely used in big data exercises and competitions. For demonstration purposes, we have hosted a Parquet-formatted version of about ten years of the trip data in a public Amazon S3 bucket.

The total file size is around 37 gigabytes, even in the efficient Parquet file format. That’s bigger than memory on most people’s computers, so you can’t just read it all in and stack it into a single data frame.

In Windows (for R > 3.6) and macOS binary packages, S3 support is included. On Linux, when installing from source, S3 support is not enabled by default, and it has additional system requirements. See vignette("install", package = "arrow") for details. To see if your arrow installation has S3 support, run:

arrow::arrow_with_s3()
## [1] TRUE

Even with S3 support enabled, network speed will be a bottleneck unless your machine is located in the same AWS region as the data. So, for this vignette, we assume that the NYC taxi dataset has been downloaded locally in an “nyc-taxi” directory.

Retrieving data from a public Amazon S3 bucket

If your arrow build has S3 support, you can sync the data locally with:

arrow::copy_files("s3://ursa-labs-taxi-data", "nyc-taxi")

If your arrow build doesn’t have S3 support, you can download the files with the additional code shown below. Since these are large files, you may need to increase R’s download timeout from the default of 60 seconds, e.g. options(timeout = 300).

bucket <- "https://ursa-labs-taxi-data.s3.us-east-2.amazonaws.com"
for (year in 2009:2019) {
  if (year == 2019) {
    # We only have through June 2019 there
    months <- 1:6
  } else {
    months <- 1:12
  }
  for (month in sprintf("%02d", months)) {
    dir.create(file.path("nyc-taxi", year, month), recursive = TRUE)
    try(download.file(
      paste(bucket, year, month, "data.parquet", sep = "/"),
      file.path("nyc-taxi", year, month, "data.parquet"),
      mode = "wb"
    ), silent = TRUE)
  }
}

Note that these download steps in the vignette are not executed: if you want to run with live data, you’ll have to do it yourself separately. Given the size, if you’re running this locally and don’t have a fast connection, feel free to grab only a year or two of data.

If you don’t have the taxi data downloaded, the vignette will still run and will yield previously cached output for reference. To be explicit about which version is running, let’s check whether you’re running with live data:

dir.exists("nyc-taxi")
## [1] FALSE

Opening the dataset

Because dplyr is not necessary for many Arrow workflows, it is an optional (Suggests) dependency. So, to work with Datasets, you need to load both arrow and dplyr.

library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

The first step is to create a Dataset object, pointing at the directory of data.

ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))

The file format for open_dataset() is controlled by the format parameter, which has a default value of "parquet". If you had a directory of Arrow format files, you could instead specify format = "arrow" in the call.

Other supported formats include:

For text files, you can pass the following parsing options to open_dataset():

For more information on the usage of these parameters, see ?read_delim_arrow().

The partitioning argument lets you specify how the file paths provide information about how the dataset is chunked into different files. The files in this example have file paths like

2009/01/data.parquet
2009/02/data.parquet
...

By providing c("year", "month") to the partitioning argument, you’re saying that the first path segment gives the value for year, and the second segment is month. Every row in 2009/01/data.parquet has a value of 2009 for year and 1 for month, even though those columns may not be present in the file.

Indeed, when you look at the dataset, you can see that in addition to the columns present in every file, there are also columns year and month even though they are not present in the files themselves.

ds
## 
## FileSystemDataset with 125 Parquet files
## vendor_id: string
## pickup_at: timestamp[us]
## dropoff_at: timestamp[us]
## passenger_count: int8
## trip_distance: float
## pickup_longitude: float
## pickup_latitude: float
## rate_code_id: null
## store_and_fwd_flag: string
## dropoff_longitude: float
## dropoff_latitude: float
## payment_type: string
## fare_amount: float
## extra: float
## mta_tax: float
## tip_amount: float
## tolls_amount: float
## total_amount: float
## year: int32
## month: int32
## 
## See $metadata for additional Schema metadata

The other form of partitioning currently supported is Hive-style, in which the partition variable names are included in the path segments. If you had saved your files in paths like:

year=2009/month=01/data.parquet
year=2009/month=02/data.parquet
...

you would not have had to provide the names in partitioning; you could have just called ds <- open_dataset("nyc-taxi") and the partitions would have been detected automatically.

Querying the dataset

Up to this point, you haven’t loaded any data. You’ve walked directories to find files, you’ve parsed file paths to identify partitions, and you’ve read the headers of the Parquet files to inspect their schemas so that you can make sure they all are as expected.

In the current release, arrow supports the dplyr verbs mutate(), transmute(), select(), rename(), relocate(), filter(), and arrange(). Aggregation is not yet supported, so before you call summarise() or other verbs with aggregate functions, use collect() to pull the selected subset of the data into an in-memory R data frame.

Suppose you attempt to call unsupported dplyr verbs or unimplemented functions in your query on an Arrow Dataset. In that case, the arrow package raises an error. However, for dplyr queries on Arrow Table objects (which are already in memory), the package automatically calls collect() before processing that dplyr verb.

Here’s an example: suppose that you are curious about tipping behavior among the longest taxi rides. Let’s find the median tip percentage for rides with fares greater than $100 in 2015, broken down by the number of passengers:

system.time(ds %>%
  filter(total_amount > 100, year == 2015) %>%
  select(tip_amount, total_amount, passenger_count) %>%
  mutate(tip_pct = 100 * tip_amount / total_amount) %>%
  group_by(passenger_count) %>%
  collect() %>%
  summarise(
    median_tip_pct = median(tip_pct),
    n = n()
  ) %>%
  print())
## 
## # A tibble: 10 x 3
##    passenger_count median_tip_pct      n
##              <int>          <dbl>  <int>
##  1               0           9.84    380
##  2               1          16.7  143087
##  3               2          16.6   34418
##  4               3          14.4    8922
##  5               4          11.4    4771
##  6               5          16.7    5806
##  7               6          16.7    3338
##  8               7          16.7      11
##  9               8          16.7      32
## 10               9          16.7      42
## 
##    user  system elapsed
##   4.436   1.012   1.402

You’ve just selected a subset out of a dataset with around 2 billion rows, computed a new column, and aggregated it in under 2 seconds on a modern laptop. How does this work?

First, mutate()/transmute(), select()/rename()/relocate(), filter(), group_by(), and arrange() record their actions but don’t evaluate on the data until you run collect().

ds %>%
  filter(total_amount > 100, year == 2015) %>%
  select(tip_amount, total_amount, passenger_count) %>%
  mutate(tip_pct = 100 * tip_amount / total_amount) %>%
  group_by(passenger_count)
## 
## FileSystemDataset (query)
## tip_amount: float
## total_amount: float
## passenger_count: int8
## tip_pct: expr
## 
## * Filter: ((total_amount > 100) and (year == 2015))
## * Grouped by passenger_count
## See $.data for the source Arrow object

This code returns an output instantly and shows the manipulations you’ve made, without loading data from the files. Because the evaluation of these queries is deferred, you can build up a query that selects down to a small subset without generating intermediate datasets that would potentially be large.

Second, all work is pushed down to the individual data files, and depending on the file format, chunks of data within the files. As a result, you can select a subset of data from a much larger dataset by collecting the smaller slices from each file—you don’t have to load the whole dataset in memory to slice from it.

Third, because of partitioning, you can ignore some files entirely. In this example, by filtering year == 2015, all files corresponding to other years are immediately excluded: you don’t have to load them in order to find that no rows match the filter. Relatedly, since Parquet files contain row groups with statistics on the data within, there may be entire chunks of data you can avoid scanning because they have no rows where total_amount > 100.

Processing data in batches

Sometimes you want to run R code on the entire dataset, but that dataset is much larger than memory. You can use map_batches on a dataset query to process it batch-by-batch.

Note: map_batches is experimental and not recommended for production use.

As an example, to randomly sample a dataset, use map_batches to sample a percentage of rows from each batch:

sampled_data <- ds %>%
  filter(year == 2015) %>%
  select(tip_amount, total_amount, passenger_count) %>%
  map_batches(~ sample_frac(as.data.frame(.), 1e-4)) %>%
  mutate(tip_pct = tip_amount / total_amount)

str(sampled_data)
## 
## 'data.frame':    15603 obs. of  4 variables:
##  $ tip_amount     : num  0 0 1.55 1.45 5.2 ...
##  $ total_amount   : num  5.8 16.3 7.85 8.75 26 ...
##  $ passenger_count: int  1 1 1 1 1 6 5 1 2 1 ...
##  $ tip_pct        : num  0 0 0.197 0.166 0.2 ...

This function can also be used to aggregate summary statistics over a dataset by computing partial results for each batch and then aggregating those partial results. Extending the example above, you could fit a model to the sample data and then use map_batches to compute the MSE on the full dataset.

model <- lm(tip_pct ~ total_amount + passenger_count, data = sampled_data)

ds %>%
  filter(year == 2015) %>%
  select(tip_amount, total_amount, passenger_count) %>%
  mutate(tip_pct = tip_amount / total_amount) %>%
  map_batches(function(batch) {
    batch %>%
      as.data.frame() %>%
      mutate(pred_tip_pct = predict(model, newdata = .)) %>%
      filter(!is.nan(tip_pct)) %>%
      summarize(sse_partial = sum((pred_tip_pct - tip_pct)^2), n_partial = n())
  }) %>%
  summarize(mse = sum(sse_partial) / sum(n_partial)) %>%
  pull(mse)
## 
## [1] 0.1304284

More dataset options

There are a few ways you can control the Dataset creation to adapt to special use cases.

Work with files in a directory

If you are working with a single file or a set of files that are not all in the same directory, you can provide a file path or a vector of multiple file paths to open_dataset(). This is useful if, for example, you have a single CSV file that is too big to read into memory. You could pass the file path to open_dataset(), use group_by() to partition the Dataset into manageable chunks, then use write_dataset() to write each chunk to a separate Parquet file—all without needing to read the full CSV file into R.

Explicitly declare column names and data types

You can specify the schema argument to open_dataset() to declare the columns and their data types. This is useful if you have data files that have different storage schema (for example, a column could be int32 in one and int8 in another) and you want to ensure that the resulting Dataset has a specific type.

To be clear, it’s not necessary to specify a schema, even in this example of mixed integer types, because the Dataset constructor will reconcile differences like these. The schema specification just lets you declare what you want the result to be.

Explicitly declare partition format

Similarly, you can provide a Schema in the partitioning argument of open_dataset() in order to declare the types of the virtual columns that define the partitions. This would be useful, in the taxi dataset example, if you wanted to keep month as a string instead of an integer.

Work with multiple data sources

Another feature of Datasets is that they can be composed of multiple data sources. That is, you may have a directory of partitioned Parquet files in one location, and in another directory, files that haven’t been partitioned. Or, you could point to an S3 bucket of Parquet data and a directory of CSVs on the local file system and query them together as a single dataset. To create a multi-source dataset, provide a list of datasets to open_dataset() instead of a file path, or simply concatenate them like big_dataset <- c(ds1, ds2).

Writing datasets

As you can see, querying a large dataset can be made quite fast by storage in an efficient binary columnar format like Parquet or Feather and partitioning based on columns commonly used for filtering. However, data isn’t always stored that way. Sometimes you might start with one giant CSV. The first step in analyzing data is cleaning is up and reshaping it into a more usable form.

The write_dataset() function allows you to take a Dataset or another tabular data object—an Arrow Table or RecordBatch, or an R data frame—and write it to a different file format, partitioned into multiple files.

Assume that you have a version of the NYC Taxi data as CSV:

ds <- open_dataset("nyc-taxi/csv/", format = "csv")

You can write it to a new location and translate the files to the Feather format by calling write_dataset() on it:

write_dataset(ds, "nyc-taxi/feather", format = "feather")

Next, let’s imagine that the payment_type column is something you often filter on, so you want to partition the data by that variable. By doing so you ensure that a filter like payment_type == "Cash" will touch only a subset of files where payment_type is always "Cash".

One natural way to express the columns you want to partition on is to use the group_by() method:

ds %>%
  group_by(payment_type) %>%
  write_dataset("nyc-taxi/feather", format = "feather")

This will write files to a directory tree that looks like this:

system("tree nyc-taxi/feather")
## feather
## ├── payment_type=1
## │   └── part-18.feather
## ├── payment_type=2
## │   └── part-19.feather
## ...
## └── payment_type=UNK
##     └── part-17.feather
##
## 18 directories, 23 files

Note that the directory names are payment_type=Cash and similar: this is the Hive-style partitioning described above. This means that when you call open_dataset() on this directory, you don’t have to declare what the partitions are because they can be read from the file paths. (To instead write bare values for partition segments, i.e. Cash rather than payment_type=Cash, call write_dataset() with hive_style = FALSE.)

Perhaps, though, payment_type == "Cash" is the only data you ever care about, and you just want to drop the rest and have a smaller working set. For this, you can filter() them out when writing:

ds %>%
  filter(payment_type == "Cash") %>%
  write_dataset("nyc-taxi/feather", format = "feather")

The other thing you can do when writing datasets is select a subset of columns or reorder them. Suppose you never care about vendor_id, and being a string column, it can take up a lot of space when you read it in, so let’s drop it:

ds %>%
  group_by(payment_type) %>%
  select(-vendor_id) %>%
  write_dataset("nyc-taxi/feather", format = "feather")

Note that while you can select a subset of columns, you cannot currently rename columns when writing a dataset.

Partitioning performance considerations

Partitioning datasets has two aspects that affect performance: it increases the number of files and it creates a directory structure around the files. Both of these have benefits as well as costs. Depending on the configuration and the size of your dataset, the costs can outweigh the benefits.

Because partitions split up the dataset into multiple files, partitioned datasets can be read and written with parallelism. However, each additional file adds a little overhead in processing for filesystem interaction. It also increases the overall dataset size since each file has some shared metadata. For example, each parquet file contains the schema and group-level statistics. The number of partitions is a floor for the number of files. If you partition a dataset by date with a year of data, you will have at least 365 files. If you further partition by another dimension with 1,000 unique values, you will have up to 365,000 files. This fine of partitioning often leads to small files that mostly consist of metadata.

Partitioned datasets create nested folder structures, and those allow us to prune which files are loaded in a scan. However, this adds overhead to discovering files in the dataset, as we’ll need to recursively “list directory” to find the data files. Too fine partitions can cause problems here: Partitioning a dataset by date for a years worth of data will require 365 list calls to find all the files; adding another column with cardinality 1,000 will make that 365,365 calls.

The most optimal partitioning layout will depend on your data, access patterns, and which systems will be reading the data. Most systems, including Arrow, should work across a range of file sizes and partitioning layouts, but there are extremes you should avoid. These guidelines can help avoid some known worst cases:

For file formats that have a notion of groups within a file, such as Parquet, similar guidelines apply. Row groups can provide parallelism when reading and allow data skipping based on statistics, but very small groups can cause metadata to be a significant portion of file size. Arrow’s file writer provides sensible defaults for group sizing in most cases.

Transactions / ACID guarantees

The dataset API offers no transaction support or any ACID guarantees. This affects both reading and writing. Concurrent reads are fine. Concurrent writes or writes concurring with reads may have unexpected behavior. Various approaches can be used to avoid operating on the same files such as using a unique basename template for each writer, a temporary directory for new files, or separate storage of the file list instead of relying on directory discovery.

Unexpectedly killing the process while a write is in progress can leave the system in an inconsistent state. Write calls generally return as soon as the bytes to be written have been completely delivered to the OS page cache. Even though a write operation has been completed it is possible for part of the file to be lost if there is a sudden power loss immediately after the write call.

Most file formats have magic numbers which are written at the end. This means a partial file write can safely be detected and discarded. The CSV file format does not have any such concept and a partially written CSV file may be detected as valid.