Skip to content

Commit

Permalink
Merge pull request #42 from jameslamb/retry-logic
Browse files Browse the repository at this point in the history
replaced single-shot HTTP requests with retry logic
  • Loading branch information
hrbrmstr committed May 6, 2020
2 parents fcd90f2 + b5c49f9 commit d325ac8
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 37 deletions.
3 changes: 2 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ Authors@R: c(
person("Andy", "Hine", email = "andyyhine@gmail.com", role = "ctb"),
person("Scott", "Came", email = "scottcame10@gmail.com", role = "ctb"),
person("David", "Severski", email = "davidski@deadheaven.com", role = "ctb",
comment = c(ORCID = "https://orcid.org/0000-0001-7867-0459"))
comment = c(ORCID = "https://orcid.org/0000-0001-7867-0459")),
person("James", "Lamb", email = "jaylamb20@gmail.com", role = "ctb")
)
Description: 'Apache Drill' is a low-latency distributed query engine designed to enable
data exploration and 'analytics' on both relational and non-relational 'datastores',
Expand Down
20 changes: 13 additions & 7 deletions R/dbi.r
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
s_head <- purrr::safely(httr::HEAD)
s_head <- purrr::safely(function(...){httr::RETRY(verb = "HEAD", ...)})

#' Driver for Drill database.
#'
Expand Down Expand Up @@ -165,27 +165,31 @@ setMethod(

if (.progress) {

httr::POST(
httr::RETRY(
verb = "POST",
url = res@drill_server,
path = "/query.json",
encode = "json",
progress(),
body = list(
queryType = "SQL",
query = res@statement
)
),
terminate_on = c(403, 404)
) -> resp

} else {

httr::POST(
httr::RETRY(
verb = "POST",
url = res@drill_server,
path = "/query.json",
encode = "json",
body = list(
queryType = "SQL",
query = res@statement
)
),
terminate_on = c(403, 404)
) -> resp

}
Expand Down Expand Up @@ -426,11 +430,13 @@ setMethod(
'dbListFields',
signature(conn='DrillResult', name='missing'),
function(conn, name) {
httr::POST(
httr::RETRY(
verb = "POST",
sprintf("%s/query.json", conn@drill_server),
encode = "json",
body = list(queryType="SQL", query=conn@statement
)
),
terminate_on = c(403, 404)
) -> res

# fatal query error on the Drill side so return no fields
Expand Down
13 changes: 8 additions & 5 deletions R/query.r
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#' ignored if \code{drill_con} is a \code{JDBCConnection} created by
#' \code{drill_jdbc()})
#' @param .progress if \code{TRUE} (default if in an interactive session) then ask
#' \code{httr::POST} to display a progress bar
#' \code{httr::RETRY} to display a progress bar
#' @references \href{https://drill.apache.org/docs/}{Drill documentation}
#' @family Drill direct REST API Interface
#' @export
Expand All @@ -40,23 +40,26 @@ drill_query <- function(drill_con, query, uplift=TRUE, .progress=interactive())
drill_server <- make_server(drill_con)

if (.progress) {
httr::POST(
httr::RETRY(
"POST",
url = sprintf("%s/query.json", drill_server),
encode = "json",
httr::progress(),
body = list(
queryType = "SQL",
query = query
)
),
terminate_on = c(403, 404)
) -> res
} else {
httr::POST(
httr::RETRY(
url = sprintf("%s/query.json", drill_server),
encode = "json",
body = list(
queryType = "SQL",
query = query
)
),
terminate_on = c(403, 404)
) -> res
}

Expand Down
34 changes: 19 additions & 15 deletions R/rest-api.r
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
s_head <- purrr::safely(httr::HEAD)
s_head <- purrr::safely(function(...){httr::RETRY(verb = "HEAD", ...)})

#' Setup a Drill connection
#'
Expand Down Expand Up @@ -61,7 +61,7 @@ drill_active <- function(drill_con) {
#' }
drill_status <- function(drill_con) {
drill_server <- make_server(drill_con)
res <- httr::GET(sprintf("%s/status", drill_server))
res <- httr::RETRY("GET", sprintf("%s/status", drill_server), terminate_on = c(403, 404))
httr::stop_for_status(res)
cnt <- httr::content(res, as="text", encoding="UTF-8")
cnt <- htmltools::HTML(cnt)
Expand All @@ -78,7 +78,7 @@ drill_status <- function(drill_con) {
#' }
drill_metrics <- function(drill_con) {
drill_server <- make_server(drill_con)
res <- httr::GET(sprintf("%s/status/metrics", drill_server))
res <- httr::RETRY("GET", sprintf("%s/status/metrics", drill_server), terminate_on = c(403, 404))
httr::stop_for_status(res)
cnt <- httr::content(res, as="text", encoding="UTF-8")
jsonlite::fromJSON(cnt, flatten=TRUE)
Expand All @@ -95,7 +95,7 @@ drill_metrics <- function(drill_con) {
#' }
drill_threads <- function(drill_con) {
drill_server <- make_server(drill_con)
res <- httr::GET(sprintf("%s/status/threads", drill_server))
res <- httr::RETRY("GET", sprintf("%s/status/threads", drill_server), terminate_on = c(403, 404))
httr::stop_for_status(res)
cnt <- httr::content(res, as="text", encoding="UTF-8")
cnt <- htmltools::HTML(sprintf("<pre>%s</pre>", cnt))
Expand All @@ -113,7 +113,7 @@ drill_threads <- function(drill_con) {
#' }
drill_profiles <- function(drill_con) {
drill_server <- make_server(drill_con)
res <- httr::GET(sprintf("%s/profiles.json", drill_server))
res <- httr::RETRY("GET", sprintf("%s/profiles.json", drill_server), terminate_on = c(403, 404))
httr::stop_for_status(res)
cnt <- httr::content(res, as="text", encoding="UTF-8")
jsonlite::fromJSON(cnt)
Expand All @@ -128,7 +128,7 @@ drill_profiles <- function(drill_con) {
#' @export
drill_profile <- function(drill_con, query_id) {
drill_server <- make_server(drill_con)
res <- httr::GET(sprintf("%s/profiles/%s.json", drill_server, query_id))
res <- httr::RETRY("GET", sprintf("%s/profiles/%s.json", drill_server, query_id), terminate_on = c(403, 404))
httr::stop_for_status(res)
cnt <- httr::content(res, as="text", encoding="UTF-8")
jsonlite::fromJSON(cnt)
Expand All @@ -143,7 +143,7 @@ drill_profile <- function(drill_con, query_id) {
#' @export
drill_cancel <- function(drill_con, query_id) {
drill_server <- make_server(drill_con)
res <- httr::GET(sprintf("%s/profiles/cancel/%s", drill_server, query_id))
res <- httr::RETRY("GET", sprintf("%s/profiles/cancel/%s", drill_server, query_id), terminate_on = c(403, 404))
httr::stop_for_status(res)
message(httr::content(res, as="text", encoding="UTF-8"))
invisible(TRUE)
Expand Down Expand Up @@ -195,9 +195,9 @@ drill_storage <- function(drill_con, plugin=NULL, as=c("tbl", "list", "raw")) {
drill_server <- make_server(drill_con)

if (is.null(plugin)) {
res <- httr::GET(sprintf("%s/storage.json", drill_server))
res <- httr::RETRY("GET", sprintf("%s/storage.json", drill_server), terminate_on = c(403, 404))
} else {
res <- httr::GET(sprintf("%s/storage/%s.json", drill_server, plugin))
res <- httr::RETRY("GET", sprintf("%s/storage/%s.json", drill_server, plugin), terminate_on = c(403, 404))
}

httr::stop_for_status(res)
Expand Down Expand Up @@ -225,11 +225,13 @@ drill_mod_storage <- function(drill_con, name, config) {

drill_server <- make_server(drill_con)

httr::POST(
httr::RETRY(
verb = "POST",
url = sprintf("%s/storage/%s.json", drill_server, name),
httr::content_type_json(),
body = config,
encode = "raw"
encode = "raw",
terminate_on = c(403, 404)
) -> res

httr::stop_for_status(res)
Expand All @@ -247,9 +249,11 @@ drill_rm_storage <- function(drill_con, name) {

drill_server <- make_server(drill_con)

httr::DELETE(
httr::RETRY(
verb = "DELETE",
url = sprintf("%s/storage/%s.json", drill_server, name),
httr::content_type_json()
httr::content_type_json(),
terminate_on = c(403, 404)
) -> res

httr::stop_for_status(res)
Expand All @@ -271,7 +275,7 @@ drill_rm_storage <- function(drill_con, name) {
#' }
drill_options <- function(drill_con, pattern=NULL) {
drill_server <- make_server(drill_con)
res <- httr::GET(sprintf("%s/options.json", drill_server))
res <- httr::RETRY("GET", sprintf("%s/options.json", drill_server), terminate_on = c(403, 404))
httr::stop_for_status(res)
cnt <- httr::content(res, as="text", encoding="UTF-8")
jsonlite::fromJSON(cnt) %>%
Expand All @@ -291,7 +295,7 @@ drill_options <- function(drill_con, pattern=NULL) {
#' }
drill_stats <- function(drill_con) {
drill_server <- make_server(drill_con)
res <- httr::GET(sprintf("%s/cluster.json", drill_server))
res <- httr::RETRY("GET", sprintf("%s/cluster.json", drill_server), terminate_on = c(403, 404))
httr::stop_for_status(res)
cnt <- httr::content(res, as="text", encoding="UTF-8")
jsonlite::fromJSON(cnt)
Expand Down
6 changes: 3 additions & 3 deletions R/schemas.R
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#'
#' @param drill_con drill server connection object setup by \code{drill_connection()}
#' @param .progress if \code{TRUE} (default if in an interactive session) then ask
#' \code{httr::POST} to display a progress bar
#' \code{httr::RETRY} to display a progress bar
#' @references \href{https://drill.apache.org/docs/}{Drill documentation}
#' @family Dill direct REST API Interface
#' @export
Expand All @@ -16,7 +16,7 @@ drill_show_schemas <- function(drill_con, .progress=interactive()) {
#' @param schema_name A unique name for a Drill schema. A schema in Drill is a configured
#' storage plugin, such as hive, or a storage plugin and workspace.
#' @param .progress if \code{TRUE} (default if in an interactive session) then ask
#' \code{httr::POST} to display a progress bar
#' \code{httr::RETRY} to display a progress bar
#' @references \href{https://drill.apache.org/docs/}{Drill documentation}
#' @family Dill direct REST API Interface
#' @export
Expand All @@ -32,7 +32,7 @@ drill_use <- function(drill_con, schema_name, .progress=interactive()) {
#' @param drill_con drill server connection object setup by \code{drill_connection()}
#' @param schema_spec properly quoted "filesystem.directory_name" reference path
#' @param .progress if \code{TRUE} (default if in an interactive session) then ask
#' \code{httr::POST} to display a progress bar
#' \code{httr::RETRY} to display a progress bar
#' @export
#' @references \href{https://drill.apache.org/docs/}{Drill documentation}
#' @family Dill direct REST API Interface
Expand Down
6 changes: 4 additions & 2 deletions R/utils.r
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@ auth_drill <- function(ssl, host, port, username, password) {

httr::set_config(config(ssl_verifypeer = 0L))

httr::POST(
httr::RETRY(
"POST",
url = sprintf("%s://%s:%s", ifelse(ssl[1], "https", "http"), host, port),
path = "/j_security_check",
encode = "form",
body = list(
j_username = username,
j_password = password
)
),
terminate_on = c(403, 404)
) -> res

httr::stop_for_status(res)
Expand Down
2 changes: 1 addition & 1 deletion man/drill_query.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/drill_show_files.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/drill_show_schemas.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/drill_use.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit d325ac8

Please sign in to comment.