import pandas as pd import numpy as np import preprocessors.preprocessing_utils as utils from preprocessors.preprocessing_utils import vectorize from preprocessors import FINAL_PROCESSED from service import k_means, gbt, ss def get_weekday(timestamp): return timestamp.now().weekday() def enrich_prediction_request(lat, lng, n, timestamp): clusters = get_clusters(lat, lng, n) preprocessed_data = ss.read.csv(FINAL_PROCESSED, inferSchema=True, header=True).toPandas() to_predict = [] for cluster in clusters: data4cluster: pd.DataFrame = preprocessed_data[preprocessed_data["Latitude"] == cluster[0]][ preprocessed_data["Longitude"] == cluster[1]] ft_1 = data4cluster["ft_1"].mean() ft_2 = data4cluster["ft_2"].mean() ft_3 = data4cluster["ft_3"].mean() ft_4 = data4cluster["ft_4"].mean() ft_5 = data4cluster["ft_5"].mean() freq1 = data4cluster.iloc[1]["freq1"] freq2 = data4cluster.iloc[1]["freq2"] freq3 = data4cluster.iloc[1]["freq3"] freq4 = data4cluster.iloc[1]["freq4"] freq5 = data4cluster.iloc[1]["freq5"] amp1 = data4cluster.iloc[1]["Amp1"] amp2 = data4cluster.iloc[1]["Amp2"] amp3 = data4cluster.iloc[1]["Amp3"] amp4 = data4cluster.iloc[1]["Amp4"] amp5 = data4cluster.iloc[1]["Amp5"] wma = data4cluster["WeightedAvg"].mean() rec = create_record(ft_5, ft_4, ft_3, ft_2, ft_1, freq1, freq2, freq3, freq4, freq5, amp1, amp2, amp3, amp4, amp5, cluster[0], cluster[1], get_weekday(timestamp), wma) to_predict.append(rec) to_predict_df = pd.concat(to_predict) df_spark = ss.createDataFrame(to_predict_df) vectorized = vectorize(df_spark.columns, df_spark) res: pd.DataFrame = gbt.predict(vectorized).toPandas()[["Latitude", "Longitude", "prediction"]] res: pd.DataFrame = res.sort_values("prediction").reset_index(drop=True) res["priority"] = res.index + 1 res = res.drop("prediction", axis=1) return res.to_dict(orient='records') def get_clusters(lat, lng, n): from sklearn.metrics import pairwise_distances_argmin_min requestor_location = np.array([float(lat), float(lng)]).reshape((1, -1)) df = pd.DataFrame(data=requestor_location, columns=["lat", "lng"]) df_Spark = ss.createDataFrame(df) df = utils.vectorize(df_Spark.columns, df_Spark) # current location cluster centers = k_means.get_centers() cluster = k_means.predict(df).collect()[0]["pickup_cluster"] current_cluster_coords = centers.pop(cluster) # find nearest n clusters around current nearest_centroids = pairwise_distances_argmin_min(centers, requestor_location)[1].tolist() nearest_centroids_dict = {} for c in range(len(nearest_centroids)): nearest_centroids_dict[c] = nearest_centroids[c] additional_clusters = sorted(nearest_centroids_dict, key=nearest_centroids_dict.get)[:int(n)] result = [] for cluster_number in additional_clusters: result.append(centers[cluster_number]) result.append(current_cluster_coords) return result def create_record(ft_5, ft_4, ft_3, ft_2, ft_1, freq1, freq2, freq3, freq4, freq5, Amp1, Amp2, Amp3, Amp4, Amp5, Latitude, Longitude, WeekDay, WeightedAvg): data = [{"ft_5": ft_5, "ft_4": ft_4, "ft_3": ft_3, "ft_2": ft_2, "ft_1": ft_1, "freq1": freq1, "freq2": freq2, "freq3": freq3, "freq4": freq4, "freq5": freq5, "Amp1": Amp1, "Amp2": Amp2, "Amp3": Amp3, "Amp4": Amp4, "Amp5": Amp5, "Latitude": Latitude, "Longitude": Longitude, "WeekDay": WeekDay, "WeightedAvg": WeightedAvg}] model_dto_df = pd.DataFrame(data) return model_dto_df
Public
Type hints help humans and linters (like mypy
) to understand what to expect "in" and "out" for a function. Not only it serves as a documentation for others (and you after some time, when the code is wiped from your "brain cache"), but also allows using automated tools to find type errors.
Other people may not know what's expected in ft_5
, ft_4
, ft_3
, ft_2
, ft_1
variables. int
? float
? dict
?
def create_record(ft_5: Union[int, float], ...):
Copy-paste lead to errors very, very often because developers forget to change value on one of copy-pasted lines . You should avoid it as much as possible. Usually a good solution is to extract the things that differ into separate variables.
Many pandas dataframe methods have inplace
param which modifies the dataframe itself and does not create a new one. This is helpful if you want to avoid redundant creation of new dataframe and removing an old one, which happens when assigning to original variable: df = df.apply(...)
-> df.apply(..., inplace=True)
.
res.drop("prediction", axis=1, inplace=True)
Using len
and range
in python's for
loop smells. Idiomatic python iteration looks like for element in collection
. If you need element's index as well, use for i, element in enumerate(collection)
.
for i, centroid in enumerate(nearest_centroids): nearest_centroids_dict[i] = centroid
List comprehensions are faster than .append()
or .extend()
. When you use a loop for filling in a list, it's a high chance that list comprehension will do a better job. You may also use asterisk (*
) to generate list from single and multiple elements: all_items = [*items_group1, *items_group2, single_element]
result = [ centers[cluster_number] for cluster_numer in additional_clusters ]
When function has too many parameters, it is very hard to read and also it is very easy to accidentally mix them and instead of calling fn(.., o, p, q)
call fn(..., o, q, p)
. Solution is to either refactor the function to receive smaller amount of data (maybe function is too big and can be divided into smaller functions?), or to send one parameter which is some data structure and incorporates all the parameters (this structure could be NamedTuple
, dataclass
, or TypedDict
).
Create new review request