Ropendal: Abstract Filesystem Access Interface for R via the Rust crate opendal, using savvy for R/Rust FFI. We keep the bottom layer byte-first: filesystem operations move raw bytes, and serializers or codecs explicitly materialize those bytes into R objects.
Async work uses Aio handles inspired by nanonext: we issue work with fs_*_aio(), wait with call_aio(), and retrieve the payload (or an opendalErrorValue) with collect_aio().
We expose the same principles to native packages through a pure C API in inst/include/ropendal.h: submit async work, wait on an Aio, then read borrowed results or fill caller-owned buffers without routing data through R raw vectors.
Serializers and codecs are first-class. Serializer/deserializer pairs control R-object materialization; native codecs such as gzip and zlib transform raw bytes. Both layers stay explicit through mode, serial_config(), codec_config(), serialize_raw(), and deserialize_raw().
Backends are configured explicitly. We pass HTTP headers to opendal("http") with headers, and we pass credential provider objects such as credentials_s3(), credentials_gcs(), credentials_azblob(), and credentials_gdrive() with auth.
Installation
Ropendal is available from R-universe:
install.packages(
"Ropendal",
repos = c("https://sounkou-bioinfo.r-universe.dev", "https://cloud.r-project.org")
)R-universe also publishes Linux binaries for Ubuntu noble on x86_64 and arm64 for R-release and R-devel. To prefer those binaries on Linux, point repos at the known binary repositories for this universe and CRAN dependencies:
options(repos = c(
Ropendal = sprintf(
"https://sounkou-bioinfo.r-universe.dev/bin/linux/noble-%s/%s/",
R.version$arch,
substr(getRversion(), 1, 3)
),
CRAN = sprintf(
"https://cran.r-universe.dev/bin/linux/noble-%s/%s/",
R.version$arch,
substr(getRversion(), 1, 3)
)
))
install.packages("Ropendal")Source builds can still use explicit OpenDAL feature flags when you need custom provider wiring.
# Keep only local filesystem, HTTP, S3-compatible, and Google Drive support.
install.packages(
"Ropendal",
repos = c("https://sounkou-bioinfo.r-universe.dev", "https://cloud.r-project.org"),
type = "source",
configure.args = "--without-default-rust-features --with-rust-features=fs,http,s3,gdrive"
)
# Add the current cloud-service feature group explicitly.
install.packages(
"Ropendal",
repos = c("https://sounkou-bioinfo.r-universe.dev", "https://cloud.r-project.org"),
type = "source",
configure.args = "--enable-cloud"
)A Quick Start
A single filesystem handle gives us byte primitives, vectorized batches, Aio handles, explicit serializers/codecs, and lower-level iterators.
Create a handle and move bytes
library(Ropendal)
root <- tempfile("ropendal-readme-")
dir.create(root, recursive = TRUE)
fs <- opendal("fs", root = root)
fs_write(fs, "note.txt", charToRaw("hello ropendal\n"))
#> [1] TRUE
rawToChar(fs_read(fs, "note.txt"))
#> [1] "hello ropendal\n"
fs_stat(fs, "note.txt")[c("path", "type", "size")]
#> $path
#> [1] "note.txt"
#>
#> $type
#> [1] "file"
#>
#> $size
#> [1] 15
vapply(fs_ls(fs), `[[`, character(1), "path")
#> [1] "note.txt"Submit a batch in one call
paths <- c("batch/one.txt", "batch/two.txt")
fs_write(
fs,
paths,
list(charToRaw("one\n"), charToRaw("two\n")),
batch_concurrency = 2
)
#> [[1]]
#> [1] TRUE
#>
#> [[2]]
#> [1] TRUE
many <- fs_read(fs, paths, offset = c(0, 0), batch_concurrency = 2)
vapply(many, rawToChar, character(1))
#> [1] "one\n" "two\n"Start async work and collect later
aio <- fs_read_aio(fs, "note.txt")
call_aio(aio)
rawToChar(collect_aio(aio))
#> [1] "hello ropendal\n"Materialize tables and bytes explicitly
A serializer/deserializer can use nanoarrow to turn a data frame into Arrow IPC bytes and back. Native codecs such as gzip remain explicit byte transforms.
arrow_config <- serial_config(
"data.frame",
sfunc = function(x) {
con <- rawConnection(raw(), "wb")
on.exit(close(con), add = TRUE)
nanoarrow::write_nanoarrow(x, con)
rawConnectionValue(con)
},
ufunc = function(raw) as.data.frame(nanoarrow::read_nanoarrow(raw))
)
arrow_tbl <- data.frame(
id = 1:3,
sample = c("HG001", "HG002", "HG003"),
depth = c(32.5, 28.0, 41.25)
)
fs_replace(fs, "tables/depth.arrows", arrow_tbl, mode = "serial", serial_config = arrow_config)
#> [1] TRUE
fs_read(fs, "tables/depth.arrows", mode = "serial", serial_config = arrow_config)
#> id sample depth
#> 1 1 HG001 32.50
#> 2 2 HG002 28.00
#> 3 3 HG003 41.25
fs_replace(fs, "objects/message.gz", charToRaw("compressed bytes\n"), mode = "codec", codec = "gzip")
#> [1] TRUE
rawToChar(fs_read(fs, "objects/message.gz", mode = "codec", codec = "gzip"))
#> [1] "compressed bytes\n"Drop down to lower-level byte iterators
writer <- fs_write_iter(fs, "stream.txt")
write_iter_write(writer, charToRaw("hello "))
#> [1] TRUE
write_iter_write(writer, charToRaw("iterator\n"))
#> [1] TRUE
write_iter_close(writer)
#> [1] TRUE
reader <- fs_read_iter(fs, "stream.txt", chunk_size = 5)
first <- read_iter_next(reader)
rawToChar(first$data)
#> [1] "hello"
fs_tell(reader)
#> [1] 5
fs_seek(reader, 6)
#> [1] 6
rawToChar(read_iter_collect(reader))
#> [1] "iterator\n"Tour of the API across backends
The same fs_* calls apply to HTTP(S), S3, and Google Drive handles.
HTTP/HTTPS read example
We define the headers once, require them on the local fixture, and pass the same headers to the HTTP filesystem handle.
root <- tempfile("ropendal-http-readme-")
dir.create(root, recursive = TRUE)
writeLines("hello http example", file.path(root, "hello.txt"))
headers <- list(
Authorization = "Bearer ropendal-readme",
`X-Ropendal-Example` = "headers"
)
fixture <- OpendalHttpFixture$start(root, required_headers = headers)
http_fs <- opendal("http", endpoint = fixture$endpoint(), root = "/", headers = headers)
rawToChar(fs_read(http_fs, "hello.txt"))
#> [1] "hello http example\n"
fs_stat(http_fs, "hello.txt")[c("path", "type", "size")]
#> $path
#> [1] "hello.txt"
#>
#> $type
#> [1] "file"
#>
#> $size
#> [1] 19
fixture$stop()
#> [1] TRUES3-compatible store (local MinIO)
We set up a temporary MinIO instance behind the scenes, then check that the S3-compatible store supports the same byte API.
We write, read, stat, and list objects with the same fs_* functions.
fs_write(s3_fs, "notes/a.txt", charToRaw("hello s3-compatible store\n"))
#> [1] TRUE
fs_write(s3_fs, "notes/b.txt", charToRaw("another object\n"))
#> [1] TRUE
rawToChar(fs_read(s3_fs, "notes/a.txt"))
#> [1] "hello s3-compatible store\n"
fs_stat(s3_fs, "notes/a.txt")[c("path", "type", "size")]
#> $path
#> [1] "notes/a.txt"
#>
#> $type
#> [1] "file"
#>
#> $size
#> [1] 26
vapply(fs_ls(s3_fs, "notes/"), `[[`, character(1), "path")
#> [1] "notes/a.txt" "notes/b.txt"The async path returns an Aio handle; we wait and collect when we need the payload.
aio <- fs_read_aio(s3_fs, "notes/b.txt")
call_aio(aio)
rawToChar(collect_aio(aio))
#> [1] "another object\n"For a small local comparison, we use a larger object to compare Ropendal’s default path with its chunked/concurrent path. Local MinIO removes most network latency, so concurrency helps most when the object is large enough or when the store is remote; for many independent objects, use batch_concurrency. paws.storage is included as a single-GET baseline.
restore_aws_env <- readme_set_aws_env(minio)
paws_s3 <- paws.storage::s3(
endpoint = minio$endpoint,
region = minio$region,
config = list(s3_force_path_style = TRUE)
)
payload_size <- 64 * 1024 * 1024
payload <- rep(as.raw(0:255), length.out = payload_size)
bench_key <- "bench/payload.bin"
read_chunk <- 8 * 1024 * 1024
write_chunk <- 8 * 1024 * 1024We first compare upload/replace paths: the default Ropendal call, Ropendal with explicit write chunking/concurrency, and paws.storage::put_object().
bench::mark(
ropendal_replace = fs_replace(s3_fs, bench_key, payload),
ropendal_replace_concurrent = fs_replace(
s3_fs,
bench_key,
payload,
write_concurrency = 4,
chunk_size = write_chunk
),
paws_put = {
paws_s3$put_object(Bucket = minio$bucket, Key = bench_key, Body = payload)
TRUE
},
iterations = 3,
check = FALSE
)[, c("expression", "min", "median", "itr/sec", "mem_alloc", "n_gc")]
#> # A tibble: 3 × 5
#> expression min median `itr/sec` mem_alloc
#> <bch:expr> <bch:tm> <bch:tm> <dbl> <bch:byt>
#> 1 ropendal_replace 126.7ms 130.4ms 7.61 0B
#> 2 ropendal_replace_concurrent 81.8ms 83.5ms 12.0 0B
#> 3 paws_put 544.7ms 544.7ms 1.84 67.7MBThen we compare download paths. The Ropendal rows separate default reads, chunked/concurrent reads, Aio reads, and Aio plus chunked/concurrent reads; paws_get remains the single-GET baseline.
bench::mark(
ropendal_read = fs_read(s3_fs, bench_key),
ropendal_read_concurrent = fs_read(
s3_fs,
bench_key,
read_concurrency = 4,
chunk_size = read_chunk
),
ropendal_read_aio = collect_aio(fs_read_aio(s3_fs, bench_key)),
ropendal_read_aio_concurrent = collect_aio(fs_read_aio(
s3_fs,
bench_key,
read_concurrency = 4,
chunk_size = read_chunk
)),
paws_get = paws_s3$get_object(Bucket = minio$bucket, Key = bench_key)$Body,
iterations = 3,
check = FALSE
)[, c("expression", "min", "median", "itr/sec", "mem_alloc", "n_gc")]
#> # A tibble: 5 × 5
#> expression min median `itr/sec` mem_alloc
#> <bch:expr> <bch:tm> <bch:tm> <dbl> <bch:byt>
#> 1 ropendal_read 70.6ms 70.6ms 14.2 64MB
#> 2 ropendal_read_concurrent 34.8ms 35.4ms 28.3 64MB
#> 3 ropendal_read_aio 39.6ms 47.1ms 21.2 64MB
#> 4 ropendal_read_aio_concurrent 31.6ms 31.6ms 31.7 64MB
#> 5 paws_get 55.4ms 56.7ms 17.1 64.3MBGoogle Drive read example (credentials explicit)
For Google Drive, we pass a credential provider object through auth and keep secret material outside the filesystem handle printout. The chunk evaluates when local credential files are available.
secret_json <- Sys.getenv("ROPENDAL_GDRIVE_SECRET_JSON")
tokens_json <- Sys.getenv("ROPENDAL_GDRIVE_TOKENS_JSON", unset = file.path(dirname(secret_json), "tokens.json"))
gdrive_root <- Sys.getenv("ROPENDAL_GDRIVE_ROOT", unset = "Ropendal")
gdrive_file <- Sys.getenv("ROPENDAL_GDRIVE_FILE", unset = "map_catalog.txt")
drive_fs <- opendal(
"gdrive",
root = gdrive_root,
auth = credentials_gdrive3(
secret_json = secret_json,
tokens_json = tokens_json,
scope = "https://www.googleapis.com/auth/drive"
)
)
drive_head <- rawToChar(fs_read(drive_fs, gdrive_file, size = 64))
drive_head
#> [1] "tr \":\" \" \" < hglft_genome_58b39_637d30_hdl_metal.bed | tr \"-\" \" "Native C API roundtrip
The native API is for other R packages that want OpenDAL-backed byte I/O without calling R while async work is running. A downstream package can declare LinkingTo: Ropendal, include <ropendal.h>, submit async work, wait on Aio handles, and read borrowed results or fill caller-owned buffers.
We exercise that installed C API in-process with Rtinycc. We assemble the C source in pieces, use Rtinycc’s bundled Protothreads header, and let R resume a native state machine while doing ordinary R-side work between resumptions. The Rprintf() calls below are demo instrumentation executed only when R resumes the C task on the main thread; OpenDAL background work does not call R.
root <- tempfile("ropendal-c-api-readme-")
dir.create(root, recursive = TRUE)
ropendal_lib <- list.files(
system.file("libs", package = "Ropendal"),
pattern = paste0("Ropendal", .Platform$dynlib.ext, "$"),
recursive = TRUE,
full.names = TRUE
)We start with includes, status codes, and a native task struct. State that must survive a protothread yield lives in the struct, not in local C variables.
c_api_state <- r"(
#include <rtinycc/pt.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <R_ext/Print.h>
#include "ropendal.h"
enum { ROPENDAL_DEMO_DONE = 0, ROPENDAL_DEMO_RUNNING = 1, ROPENDAL_DEMO_ERROR = -1 };
static const uint8_t demo_payload[] = "hello native api\n";
typedef struct ropendal_demo_task {
struct pt pt;
ropendal_fs_t *fs;
ropendal_aio_t *aio;
ropendal_error_t *err;
ropendal_write_options_t write_opts;
ropendal_read_options_t read_opts;
ropendal_ls_options_t ls_opts;
const ropendal_entry_t *entry;
const ropendal_entry_t *entries;
size_t nentries;
size_t nread;
uint8_t dst[64];
int status;
int failed;
int done;
int tick;
const char *step;
char message[256];
} ropendal_demo_task_t;
)"Next we define cleanup and error helpers plus a constructor. ropendal_fs_open() copies the root configuration during the call, so the R string does not need to outlive the constructor.
c_api_lifecycle <- r"(
static void demo_set_error(ropendal_demo_task_t *task, const char *fallback) {
const char *message = fallback ? fallback : "native task failed";
if (task && task->err) {
const char *native_message = ropendal_error_message(task->err);
if (native_message && native_message[0]) message = native_message;
}
if (task) {
snprintf(task->message, sizeof(task->message), "%s", message);
task->step = message;
task->failed = 1;
if (task->err) {
ropendal_error_release(task->err);
task->err = 0;
}
}
}
#define DEMO_FAIL(task, message) do { demo_set_error((task), (message)); PT_EXIT(&(task)->pt); } while (0)
void *ropendal_demo_open(const char *root) {
ropendal_demo_task_t *task = (ropendal_demo_task_t *)calloc(1, sizeof(ropendal_demo_task_t));
if (!task) return NULL;
PT_INIT(&task->pt);
task->step = "open filesystem";
ropendal_kv_t cfg = { sizeof(ropendal_kv_t), "root", root };
task->status = ropendal_fs_open("fs", &cfg, 1, &task->fs, &task->err);
if (task->status != ROPENDAL_OK) demo_set_error(task, "ropendal_fs_open failed");
else task->step = "filesystem ready";
return task;
}
void ropendal_demo_free(void *ptr) {
ropendal_demo_task_t *task = (ropendal_demo_task_t *)ptr;
if (!task) return;
if (task->aio) ropendal_aio_release(task->aio);
if (task->fs) ropendal_fs_release(task->fs);
if (task->err) ropendal_error_release(task->err);
free(task);
}
)"The protothread body submits async work, yields to R, polls for completion, and then extracts results. This performs write, stat, listing, and a read into a caller-owned C buffer.
c_api_protothread <- r"(
static int ropendal_demo_resume_internal(ropendal_demo_task_t *task) {
PT_BEGIN(&task->pt);
task->step = "submit write";
memset(&task->write_opts, 0, sizeof(task->write_opts));
task->write_opts.struct_size = sizeof(task->write_opts);
task->write_opts.path = "native.txt";
task->status = ropendal_write_aio(
task->fs, &task->write_opts, demo_payload, sizeof(demo_payload) - 1, &task->aio, &task->err
);
if (task->status != ROPENDAL_OK) DEMO_FAIL(task, "write submit failed");
task->step = "wait write";
PT_YIELD(&task->pt);
while (ropendal_aio_poll(task->aio) == ROPENDAL_AIO_PENDING) PT_YIELD(&task->pt);
task->status = ropendal_aio_wait(task->aio, -1, &task->err);
if (task->status != ROPENDAL_OK) DEMO_FAIL(task, "write wait failed");
ropendal_aio_release(task->aio);
task->aio = 0;
task->step = "submit stat";
task->status = ropendal_stat_aio(task->fs, "native.txt", 0, 0, &task->aio, &task->err);
if (task->status != ROPENDAL_OK) DEMO_FAIL(task, "stat submit failed");
task->step = "wait stat";
PT_YIELD(&task->pt);
while (ropendal_aio_poll(task->aio) == ROPENDAL_AIO_PENDING) PT_YIELD(&task->pt);
task->status = ropendal_aio_wait(task->aio, -1, &task->err);
if (task->status != ROPENDAL_OK) DEMO_FAIL(task, "stat wait failed");
task->status = ropendal_aio_result_entry(task->aio, &task->entry, &task->err);
if (task->status != ROPENDAL_OK || !task->entry || !task->entry->has_content_length) {
DEMO_FAIL(task, "stat result failed");
}
if (task->entry->content_length != sizeof(demo_payload) - 1) DEMO_FAIL(task, "unexpected stat size");
ropendal_aio_release(task->aio);
task->aio = 0;
task->step = "submit list";
memset(&task->ls_opts, 0, sizeof(task->ls_opts));
task->ls_opts.struct_size = sizeof(task->ls_opts);
task->ls_opts.path = "";
task->status = ropendal_ls_aio(task->fs, &task->ls_opts, &task->aio, &task->err);
if (task->status != ROPENDAL_OK) DEMO_FAIL(task, "ls submit failed");
task->step = "wait list";
PT_YIELD(&task->pt);
while (ropendal_aio_poll(task->aio) == ROPENDAL_AIO_PENDING) PT_YIELD(&task->pt);
task->status = ropendal_aio_wait(task->aio, -1, &task->err);
if (task->status != ROPENDAL_OK) DEMO_FAIL(task, "ls wait failed");
task->status = ropendal_aio_result_entries(task->aio, &task->entries, &task->nentries, &task->err);
if (task->status != ROPENDAL_OK || task->nentries == 0) DEMO_FAIL(task, "ls result failed");
ropendal_aio_release(task->aio);
task->aio = 0;
task->step = "submit read_into";
memset(&task->read_opts, 0, sizeof(task->read_opts));
memset(task->dst, 0, sizeof(task->dst));
task->read_opts.struct_size = sizeof(task->read_opts);
task->read_opts.path = "native.txt";
task->status = ropendal_read_into_aio(task->fs, &task->read_opts, task->dst, sizeof(task->dst), &task->aio, &task->err);
if (task->status != ROPENDAL_OK) DEMO_FAIL(task, "read_into submit failed");
task->step = "wait read_into";
PT_YIELD(&task->pt);
while (ropendal_aio_poll(task->aio) == ROPENDAL_AIO_PENDING) PT_YIELD(&task->pt);
task->status = ropendal_aio_wait(task->aio, -1, &task->err);
if (task->status != ROPENDAL_OK) DEMO_FAIL(task, "read_into wait failed");
task->status = ropendal_aio_result_nread(task->aio, &task->nread, &task->err);
if (task->status != ROPENDAL_OK) DEMO_FAIL(task, "read_into nread failed");
if (task->nread != sizeof(demo_payload) - 1 || memcmp(task->dst, demo_payload, task->nread) != 0) {
DEMO_FAIL(task, "read_into bytes mismatch");
}
ropendal_aio_release(task->aio);
task->aio = 0;
task->step = "complete";
task->done = 1;
PT_END(&task->pt);
}
)"Finally we expose small C entry points for Rtinycc to call.
c_api_exports <- r"(
static const char *ropendal_demo_status_name(int status) {
if (status == ROPENDAL_DEMO_RUNNING) return "running";
if (status == ROPENDAL_DEMO_DONE) return "done";
return "error";
}
static int ropendal_demo_report(ropendal_demo_task_t *task, int status) {
task->tick += 1;
Rprintf(
"native request %d: %s; %s\n",
task->tick,
ropendal_demo_status_name(status),
task->step ? task->step : "unknown"
);
return status;
}
int ropendal_demo_resume(void *ptr) {
ropendal_demo_task_t *task = (ropendal_demo_task_t *)ptr;
if (!task) return ROPENDAL_DEMO_ERROR;
if (task->failed) return ropendal_demo_report(task, ROPENDAL_DEMO_ERROR);
if (task->done) return ropendal_demo_report(task, ROPENDAL_DEMO_DONE);
task->status = ropendal_demo_resume_internal(task);
if (task->failed) return ropendal_demo_report(task, ROPENDAL_DEMO_ERROR);
if (task->status == PT_YIELDED || task->status == PT_WAITING) {
return ropendal_demo_report(task, ROPENDAL_DEMO_RUNNING);
}
task->done = 1;
return ropendal_demo_report(task, ROPENDAL_DEMO_DONE);
}
int ropendal_demo_nread(void *ptr) {
ropendal_demo_task_t *task = (ropendal_demo_task_t *)ptr;
return task ? (int)task->nread : -1;
}
const char *ropendal_demo_error(void *ptr) {
ropendal_demo_task_t *task = (ropendal_demo_task_t *)ptr;
return task ? task->message : "null task";
}
)"We compile and bind that native code in memory.
c_api_code <- paste(c_api_state, c_api_lifecycle, c_api_protothread, c_api_exports, sep = "\n")
ffi <- Rtinycc::tcc_ffi() |>
Rtinycc::tcc_include(system.file("include", package = "Rtinycc")) |>
Rtinycc::tcc_include(system.file("include", package = "Ropendal")) |>
Rtinycc::tcc_library(ropendal_lib[[1]]) |>
Rtinycc::tcc_source(c_api_code) |>
Rtinycc::tcc_bind(
ropendal_demo_open = list(args = list("cstring"), returns = "ptr"),
ropendal_demo_resume = list(args = list("ptr"), returns = "i32"),
ropendal_demo_nread = list(args = list("ptr"), returns = "i32"),
ropendal_demo_error = list(args = list("ptr"), returns = "cstring"),
ropendal_demo_free = list(args = list("ptr"), returns = "void")
) |>
Rtinycc::tcc_compile()R controls the scheduling loop. Between native resumptions we do ordinary R work; the native code never creates R objects while OpenDAL I/O is running.
task <- ffi$ropendal_demo_open(root)
status <- 1L
r_ticks <- 0L
r_work <- 0L
while (status > 0L) {
status <- ffi$ropendal_demo_resume(task)
r_ticks <- r_ticks + 1L
r_work <- r_work + sum(seq_len(1000L))
Sys.sleep(0.001)
}
#> native request 1: running; wait write
#> native request 2: running; wait write
#> native request 3: running; wait write
#> native request 4: running; wait stat
#> native request 5: running; wait list
#> native request 6: running; wait read_into
#> native request 7: done; complete
if (status < 0L) {
message <- ffi$ropendal_demo_error(task)
ffi$ropendal_demo_free(task)
stop(message, call. = FALSE)
}
nread <- ffi$ropendal_demo_nread(task)
ffi$ropendal_demo_free(task)
#> NULL
c(
r_ticks = r_ticks,
r_work = r_work,
bytes_read_into_c_buffer = nread
)
#> r_ticks r_work bytes_read_into_c_buffer
#> 7 3503500 17Development
Common targets:
make --no-print-directory help
Common development targets:
make rd regenerate savvy wrappers, roxygen docs, and NAMESPACE
make test-fast install current source and run non-network tinytest
make test-http run opt-in local HTTP fixture tests
make test-s3 run opt-in public read-only S3-compatible tests
make test-s3-minio start local MinIO and run writable S3-compatible tests
make test-gdrive run opt-in Google Drive tests using local gdrive3 JSON defaults
make test-ci run C API checks and CI-only tinytest
make rdm render README.md from README.Rmd
make bench-minio-paws render development MinIO benchmark
make check build and run R CMD check --as-cran --no-manual