Am I the only one seeing here a sparse linear regression? Hopefully by the end of this post you’ll see it too (:

Its been a while since my last blog post as I’ve started working at SimilarWeb - A company that provides data on internet traffic for various use cases. During this time I’ve encountered an interesting ML problem setup that requires dealing with quite a few technical hurdles. In this blog post I’d like to share the problem and Scala spark recipe code to tackle it at scale.

## Some motivation

Since I can’t really divulge the specifics of the problem I’m working on I’ll illustrate the problem setup with a (possibly) imaginary scenario.

Let’s imagine we’re working for a credit card company. We issue credit cards to consumers who go and spend money at different businesses. Knowing the monthly revenue those business generate can be instrumental for pricing and marketing purposes.

To try and estimate business monthly revenue we asked a small subset of the businesses to disclose their last month revenue. We’ll denote the last month revenue for business \(i\) by \(y_i\).

Next, we have at our disposal purchase information from our credit card holders. Let’s denote the total money spent by customer \(j\) at business \(i\) by \(x_{ij}\).

We can assume that the total purchases by our credit card holders at a given business \(i\): \(x_i=\Sigma x_{ij}\) represents only a fraction of the total revenue \(y_i\). We can thus assume that there’s some inflation factor \(\beta\) such that:

\[y_i = x_i \beta + \epsilon_i\]

Estimation of \(\beta\) would amount to a simple regression. We can then use the model to estimate monthly revenue for all the other businesses where our card holders are spending money.

Taking the above idea one step further, we can imagine that different card holders might represent very different purchasing patterns, which could necessitate different inflation factor \(\beta_j\) for every consumer \(j\). This would amount to assuming that:

\[y_i = \Sigma x_{ij}\beta_j + \epsilon_i\] and in matrix notation:

\[Y=X\beta + \epsilon\] Estimation of the above equation might seem like a simple regression but in practice it introduces several challenges:

- \(X\) is usually very large. It would span as many rows as we have businesses
in our sample and as many columns as the number of credit card holders. In my
application I had 70K rows on 1.8M columns (which roughly translates to 470 GB
assuming we encode the matrix entries as integer).

- \(X\) can have many more columns than it has rows. This kind of matrix is rank defiecient which in practice means we can’t calculate the closed form solution (there’s actually infinite solutions).

## Sparse linear regression in scala spark

Luckily enough Spark Scala has all the tools needed to tackle the above problem at scale!

First, we note that while \(X\) is pretty huge, it’s also usually very sparse. In my application it had the value 0 in 99.95% of it’s entries. That means we can use sparse matrix representation to avoid blowing up our memory.

Next, we discover that the spark ml linear regression implementation can fit the model using gradient descent rather than using the closed form solution. This means we can solve the above optimization problem and arrive at one (of infinite) solutions to the equation.

## Code receipe

Note: The below assumes spark 3.0+

A reasonable place to start our code example is to assume we have some
transnational DataFrame *user_df* for a given month with the following schema:

- user_id: StringType

- business_id: IntegerType
- amount_spent: DoubleType

```
import spark.implicits._
val user_df = Seq(
("ad3a", 1, 14.0),
("ad3a", 0, 13.0),
("3Gd2e", 1, 11.0)
).toDF("user_id", "business_id", "amount_spent")
user_df.show()
```

```
##
## +-------+-----------+------------+
## |user_id|business_id|amount_spent|
## +-------+-----------+------------+
## | ad3a| 1| 14.0|
## | ad3a| 0| 13.0|
## | 3Gd2e| 1| 11.0|
## +-------+-----------+------------+
##
```

We also have a DataFrame containing a sample of businesses total revenues
*biz_df*:

- business_id: IntegerType

- total_revenue: FloatType

```
val biz_df = Seq(
(0, 13.9),
(1, 36.7)
).toDF("business_id", "total_revenue")
biz_df.show()
```

```
##
## +-----------+-------------+
## |business_id|total_revenue|
## +-----------+-------------+
## | 0| 13.9|
## | 1| 36.7|
## +-----------+-------------+
##
```

Let’s start with some preprocesssing in the *user_df* DataFrame. First, we need
to convert *user_id*s to column index in the \(X\) matrix. This means every user
needs to have an integer denoting the matrix column index that represents his
spending. Spark has a great utility that can serve that purpose: *StringIndexer*

```
import org.apache.spark.ml.feature.StringIndexer
val stringIndexer = new StringIndexer()
.setInputCol("user_id")
.setOutputCol("userIndex")
val stringIndexerModel = stringIndexer.fit(user_df)
user_df = stringIndexerModel
.transform(user_df)
.withColumn("userIndex", $"userIndex".cast("Int"))
user_df.show()
```

```
##
## +-------+-----------+------------+---------+
## |user_id|business_id|amount_spent|userIndex|
## +-------+-----------+------------+---------+
## | ad3a| 1| 14.0| 0|
## | ad3a| 0| 13.0| 0|
## | 3Gd2e| 1| 11.0| 1|
## +-------+-----------+------------+---------+
##
```

Next, we’ll collect for every business the users who spent money with them and the corresponding amounts. In the below code we’ll also join in the business total revenue too:

```
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
val sortUdf: UserDefinedFunction = udf((rows: Seq[Row]) => {
rows.map { case Row(userIndex: Int, amount_spent: Double) => (userIndex, amount_spent) }
.sortBy { case (userIndex, amount_spent) => userIndex }
})
val user_business_amount_spent_df = user_df
.groupBy("business_id")
.agg(collect_list(struct("userIndex", "amount_spent")).alias("user_amount_spent_list"))
.select($"business_id", sortUdf($"user_amount_spent_list").alias("user_amount_spent_list"))
.withColumn("userIndex_list", $"user_amount_spent_list".getField("_1"))
.withColumn("amount_spent_list", $"user_amount_spent_list".getField("_2"))
.drop("user_amount_spent_list")
.join(biz_df, "business_id")
user_business_amount_spent_df.show()
```

```
##
## +-----------+--------------+-----------------+-------------+
## |business_id|userIndex_list|amount_spent_list|total_revenue|
## +-----------+--------------+-----------------+-------------+
## | 1| [0, 1]| [14.0, 11.0]| 36.7|
## | 0| [0]| [13.0]| 13.9|
## +-----------+--------------+-----------------+-------------+
##
```

Note that we use the UDF above to ensure user user indices are in ascending
order as the *collect_list* function does not ensure any specific ordering.

Now, all there’s left to do is represent every row as a sparse vector and
it’s corresponding label. I’ve also created a small case class that retains
the *business_id* identifier:

```
import org.apache.spark.ml.linalg.SparseVector
case class LabeledPoint2(label: Double, features: SparseVector, business: Int)
val num_columns = stringIndexerModel.labelsArray.size + 1
val sparse_user_business_amount_spent_df = user_business_amount_spent_df
.map(r => LabeledPoint2(r.getDouble(3),
new SparseVector(size = num_columns,
indices = r.getAs[Seq[Int]]("userIndex_list").toArray,
values = r.getAs[Seq[Double]]("amount_spent_list").toArray),
r.getInt(0)))
sparse_user_business_amount_spent_df.show(truncate=false)
```

```
##
## +-----+---------------------+--------+
## |label|features |business|
## +-----+---------------------+--------+
## |36.7 |(2,[0,1],[14.0,11.0])|1 |
## |13.9 |(2,[0],[13.0]) |0 |
## +-----+---------------------+--------+
##
```

and voila - there you have it - our matrix is encoded sparsely, enabling us to conveniently store it in memory and fitting to it all models available in spark ml in a scalable manner! In my application fitting a linear regression took 20-30 minutes.

```
import org.apache.spark.ml.regression.LinearRegression
val lr = new LinearRegression()
.setFitIntercept(false)
val lrModel = lr.fit(sparse_user_business_amount_spent_df)
```

We can see our regression arrived at the correct betas (1,2):

`lrModel.coefficients`

`## [1] "org.apache.spark.ml.linalg.Vector = [1.0692307692307697,1.9755244755244754]"`

One interesting fact about the above problem setup is that since it has more columns than rows it fits the training data perfectly (resulting in 0 training error). While this usually indicates overfitting, the above model performs pretty well in my application. This probably has to do with a phenomenon called “double descent risk curve”. More on that on my next post!