• AIPressRoom
  • Posts
  • weighted quantile summaries, energy iteration clustering, spark_write_rds(), and extra

weighted quantile summaries, energy iteration clustering, spark_write_rds(), and extra

Sparklyr 1.6 is now out there on CRAN!

To put in sparklyr 1.6 from CRAN, run

On this weblog put up, we will spotlight the next options and enhancementsfrom sparklyr 1.6:

Weighted quantile summaries

Apache Spark is well-known for supportingapproximate algorithms that commerce off marginal quantities of accuracy for betterpace and parallelism.Such algorithms are notably helpful for performing preliminary knowledgeexplorations at scale, as they allow customers to rapidly question sure estimatedstatistics inside a predefined error margin, whereas avoiding the excessive value ofactual computations.One instance is the Greenwald-Khanna algorithm for on-line computation of quantilesummaries, as described in Greenwald and Khanna (2001).This algorithm was initially designed for environment friendly (epsilon)–approximation of quantiles inside a big dataset with out the notion of informationfactors carrying completely different weights, and the unweighted model of it has beencarried out asapproxQuantile()since Spark 2.0.Nonetheless, the identical algorithm might be generalized to deal with weightedinputs, and as sparklyr consumer @Zhuk66 talked aboutin this issue, aweighted versionof this algorithm makes for a helpful sparklyr characteristic.

To correctly clarify what weighted-quantile means, we should make clear what theweight of every knowledge level signifies. For instance, if we now have a sequence ofobservations ((1, 1, 1, 1, 0, 2, -1, -1)), and wish to approximate themedian of all knowledge factors, then we now have the next two choices:

  • Both run the unweighted model of approxQuantile() in Spark to scanvia all 8 knowledge factors

  • Or alternatively, “compress” the info into 4 tuples of (worth, weight):((1, 0.5), (0, 0.125), (2, 0.125), (-1, 0.25)), the place the second part ofevery tuple represents how typically a price happens relative to the remainder of thenoticed values, after which discover the median by scanning via the 4 tuplesutilizing the weighted model of the Greenwald-Khanna algorithm

We are able to additionally run via a contrived instance involving the usual regulardistribution for example the facility of weighted quantile estimation insparklyr 1.6. Suppose we can’t merely run qnorm() in R to guage thequantile functionof the usual regular distribution at (p = 0.25) and (p = 0.75), how canwe get some imprecise thought in regards to the 1st and third quantiles of this distribution?A method is to pattern numerous knowledge factors from this distribution, andthen apply the Greenwald-Khanna algorithm to our unweighted samples, as provenunder:

library(sparklyr)

sc <- spark_connect(grasp = "native")

num_samples <- 1e6
samples <- data.frame(x = rnorm(num_samples))

samples_sdf <- copy_to(sc, samples, identify = random_string())

samples_sdf %>%
  sdf_quantile(
    column = "x",
    chances = c(0.25, 0.75),
    relative.error = 0.01
  ) %>%
  print()
##        25%        75%
## -0.6629242  0.6874939

Discover that as a result of we’re working with an approximate algorithm, and have specifiedrelative.error = 0.01, the estimated worth of (-0.6629242) from abovemay very well be wherever between the twenty fourth and the twenty sixth percentile of all samples.In actual fact, it falls within the (25.36896)-th percentile:

## [1] 0.2536896

Now how can we make use of weighted quantile estimation from sparklyr 1.6 toget hold of related outcomes? Easy! We are able to pattern numerous (x) valuesuniformly randomly from ((-infty, infty)) (or alternatively, simply choose amassive variety of values evenly spaced between ((-M, M)) the place (M) isroughly (infty)), and assign every (x) worth a weight of(displaystyle frac{1}{sqrt{2 pi}}e^{-frac{x^2}{2}}), the usual regulardistribution’s likelihood density at (x). Lastly, we run the weighted modelof sdf_quantile() from sparklyr 1.6, as proven under:

library(sparklyr)

sc <- spark_connect(grasp = "native")

num_samples <- 1e6
M <- 1000
samples <- tibble::tibble(
  x = M * seq(-num_samples / 2 + 1, num_samples / 2) / num_samples,
  weight = dnorm(x)
)

samples_sdf <- copy_to(sc, samples, identify = random_string())

samples_sdf %>%
  sdf_quantile(
    column = "x",
    weight.column = "weight",
    chances = c(0.25, 0.75),
    relative.error = 0.01
  ) %>%
  print()
##    25%    75%
## -0.696  0.662

Voilà! The estimates will not be too far off from the twenty fifth and seventy fifth percentiles (inrelation to our abovementioned most permissible error of (0.01)):

## [1] 0.2432144
## [1] 0.7460144

Energy iteration clustering

Energy iteration clustering (PIC), a easy and scalable graph clustering techniqueoffered in Lin and Cohen (2010), first finds a low-dimensional embedding of a dataset, utilizingtruncated energy iteration on a normalized pairwise-similarity matrix of all knowledgefactors, after which makes use of this embedding because the “cluster indicator,” an intermediateillustration of the dataset that results in quick convergence when used as enterto k-means clustering. This course of may be very properly illustrated in determine 1of Lin and Cohen (2010) (reproduced under)

during which the leftmost picture is the visualization of a dataset consisting of threecircles, with factors coloured in pink, inexperienced, and blue indicating clusteringoutcomes, and the next photos present the facility iteration course of steadilyremodeling the unique set of factors into what seems to be three disjoint linesegments, an intermediate illustration that may be quickly separated into 3clusters utilizing k-means clustering with (ok = 3).

In sparklyr 1.6, ml_power_iteration() was carried out to make thePIC functionalityin Spark accessible from R. It expects as enter a 3-column Spark dataframe thatrepresents a pairwise-similarity matrix of all knowledge factors. Two ofthe columns on this dataframe ought to comprise 0-based row and column indices, andthe third column ought to maintain the corresponding similarity measure.Within the instance under, we are going to see a dataset consisting of two circles beingsimply separated into two clusters by ml_power_iteration(), with the Gaussiankernel getting used because the similarity measure between any 2 factors:

gen_similarity_matrix <- perform() {
  # Guassian similarity measure
  guassian_similarity <- perform(pt1, pt2) {
    exp(-sum((pt2 - pt1) ^ 2) / 2)
  }
  # generate evenly distributed factors on a circle centered on the origin
  gen_circle <- perform(radius, num_pts) {
    seq(0, num_pts - 1) %>%
      purrr::map_dfr(
        perform(idx) {
          theta <- 2 * pi * idx / num_pts
          radius * c(x = cos(theta), y = sin(theta))
        })
  }
  # generate factors on each circles
  pts <- rbind(
    gen_circle(radius = 1, num_pts = 80),
    gen_circle(radius = 4, num_pts = 80)
  )
  # populate the pairwise similarity matrix (saved as a 3-column dataframe)
  similarity_matrix <- data.frame()
  for (i in seq(2, nrow(pts)))
    similarity_matrix <- similarity_matrix %>%
      rbind(seq(i - 1L) %>%
        purrr::map_dfr(~ list(
          src = i - 1L, dst = .x - 1L,
          similarity = guassian_similarity(pts[i,], pts[.x,])
        ))
      )

  similarity_matrix
}

library(sparklyr)

sc <- spark_connect(grasp = "native")
sdf <- copy_to(sc, gen_similarity_matrix())
clusters <- ml_power_iteration(
  sdf, ok = 2, max_iter = 10, init_mode = "diploma",
  src_col = "src", dst_col = "dst", weight_col = "similarity"
)

clusters %>% print(n = 160)
## # A tibble: 160 x 2
##        id cluster
##     <dbl>   <int>
##   1     0       1
##   2     1       1
##   3     2       1
##   4     3       1
##   5     4       1
##   ...
##   157   156       0
##   158   157       0
##   159   158       0
##   160   159       0

The output reveals factors from the 2 circles being assigned to separate clusters,as anticipated, after solely a small variety of PIC iterations.

spark_write_rds() + collect_from_rds()

spark_write_rds() and collect_from_rds() are carried out as a much less memory-consuming various to gather(). Not like gather(), which retrieves allparts of a Spark dataframe via the Spark driver node, therefore doubtlesslyinflicting slowness or out-of-memory failures when accumulating massive quantities of information,spark_write_rds(), when used together with collect_from_rds(), canretrieve all partitions of a Spark dataframe straight from Spark staff,reasonably than via the Spark driver node.First, spark_write_rds() willdistribute the duties of serializing Spark dataframe partitions in RDS model2 format amongst Spark staff. Spark staff can then course of a number of partitionsin parallel, every dealing with one partition at a time and persisting the RDS outputon to disk, reasonably than sending dataframe partitions to the Spark drivernode. Lastly, the RDS outputs might be re-assembled to R dataframes utilizingcollect_from_rds().

Proven under is an instance of spark_write_rds() + collect_from_rds() utilization,the place RDS outputs are first saved to HDFS, then downloaded to the nativefilesystem with hadoop fs -get, and at last, post-processed withcollect_from_rds():

library(sparklyr)
library(nycflights13)

num_partitions <- 10L
sc <- spark_connect(grasp = "yarn", spark_home = "/usr/lib/spark")
flights_sdf <- copy_to(sc, flights, repartition = num_partitions)

# Spark staff serialize all partition in RDS format in parallel and write RDS
# outputs to HDFS
spark_write_rds(
  flights_sdf,
  dest_uri = "hdfs://<namenode>:8020/flights-part-{partitionId}.rds"
)

# Run `hadoop fs -get` to obtain RDS information from HDFS to native file system
for (partition in seq(num_partitions) - 1)
  system2(
    "hadoop",
    c("fs", "-get", sprintf("hdfs://<namenode>:8020/flights-part-%d.rds", partition))
  )

# Submit-process RDS outputs
partitions <- seq(num_partitions) - 1 %>%
  lapply(perform(partition) collect_from_rds(sprintf("flights-part-%d.rds", partition)))

# Optionally, name `rbind()` to mix knowledge from all partitions right into a single R dataframe
flights_df <- do.call(rbind, partitions)

Much like different latest sparklyr releases, sparklyr 1.6 comes with avariety of dplyr-related enhancements, similar to

  • Help for the place() predicate inside choose() and summarize(throughout(...))operations on Spark dataframes

  • Addition of if_all() and if_any() capabilities

  • Full compatibility with dbplyr 2.0 backend API

choose(the place(...)) and summarize(throughout(the place(...)))

The dplyr the place(...) assemble is helpful for making use of a range oraggregation perform to a number of columns that fulfill some boolean predicate.For instance,

returns all numeric columns from the iris dataset, and

computes the common of every numeric column.

In sparklyr 1.6, each sorts of operations might be utilized to Spark dataframes, e.g.,

if_all() and if_any()

if_all() and if_any() are two comfort capabilities from dplyr 1.0.4 (seehere for extra particulars)that successfullymix the outcomes of making use of a boolean predicate to a tidy collection of columnsutilizing the logical and/or operators.

Ranging from sparklyr 1.6, if_all() and if_any() may also be utilized toSpark dataframes, .e.g.,

Compatibility with dbplyr 2.0 backend API

Sparklyr 1.6 is totally suitable with the newer dbplyr 2.0 backend API (byimplementing all interface adjustments beneficial inhere), whereas nonethelesssustaining backward compatibility with the earlier version of dbplyr API, sothat sparklyr customers won’t be compelled to modify to any explicit model ofdbplyr.

This needs to be a principally non-user-visible change as of now. In actual fact, the onediscernible conduct change would be the following code

outputting

[1] 2

if sparklyr is working with dbplyr 2.0+, and

[1] 1

if in any other case.

Acknowledgements

In chronological order, we wish to thank the next contributors formaking sparklyr 1.6 superior:

We’d additionally like to provide an enormous shout-out to the great open-source groupbehind sparklyr, with out whom we’d not have benefitted from quite a fewsparklyr-related bug studies and have solutions.

Lastly, the writer of this weblog put up additionally very a lot appreciates the extremelyprecious editorial solutions from @skeydan.

For those who want to study extra about sparklyr, we advocate trying outsparklyr.ai, spark.rstudio.com,and likewise some earlier sparklyr launch posts similar tosparklyr 1.5and sparklyr 1.4.

That’s all. Thanks for studying!

Greenwald, Michael, and Sanjeev Khanna. 2001. “Area-Environment friendly On-line Computation of Quantile Summaries.” SIGMOD Rec. 30 (2): 58–66. https://doi.org/10.1145/376284.375670.

Lin, Frank, and William Cohen. 2010. “Energy Iteration Clustering.” In, 655–62.

Take pleasure in this weblog? Get notified of recent posts by e-mail:

Posts additionally out there at r-bloggers