import pandas as pd import matplotlib.pyplot as plt import numpy as np def anamoly_detection(new: pd.DataFrame): new["std"] = new["value"].std() new["higher_std_value"] =...
import pandas as pd import matplotlib.pyplot as plt import numpy as np def anamoly_detection(new: pd.DataFrame): new["std"] = new["value"].std() new["higher_std_value"] = new["std"] * 2 new["lower_std_value"] = new["std"] * -2 new["mean"] = new["value"].mean() new["lower_10th_percentile"] = new["value"].quantile(0.1) new["upper_10th_percentile"] = new["value"].quantile(0.9) new["quantile"] = new["value"].quantile() new["percentile_value"] = new["value"].quantile() # if |change|> quantile value, then 1 for huge_change new["huge_change"] = new["value"].diff().abs() > new["quantile"] # percentile value of the observation # if mod of change per row of value is greater than quantile value, then anomaly new["change"] = new["value"].diff() new["percentile_value_of_observation"] = new["value"].rank(pct=True) plt.plot(new["date"], new["value"], label="value") plt.plot( new["date"], new["higher_std_value"], label="higher_std_value", color="green" ) plt.plot(new["date"], new["mean"], label="mean", color="blue") plt.plot(new["date"], new["lower_std_value"], label="lower_std_value", color="red") plt.plot( new["date"], new["lower_10th_percentile"], label="lower_10th_percentile", color="orange", linestyle="--", ) plt.plot( new["date"], new["upper_10th_percentile"], label="upper_10th_percentile", color="orange", linestyle="--", ) plt.xticks( ticks=None, # rotation=90, # fontweight="light", # fontsize="x-small", ) point_to_highlight_max = new["value"].max() point_to_highlight_min = new["value"].min() from sklearn.neighbors import LocalOutlierFactor clf = LocalOutlierFactor(n_neighbors=24) new["LOF score"] = clf.fit_predict(new[["value"]]) from sklearn.ensemble import IsolationForest model = IsolationForest( n_estimators=100, max_samples="auto", contamination=float(0.3), max_features=1.0, random_state=42, ) model.fit(new[["value"]]) new["isolation_function_scores"] = model.decision_function(new[["value"]]) new["isolation_anomaly_score"] = model.predict(new[["value"]]) # new.to_csv('anomaly.csv') plt.scatter( new["date"][new["value"] == point_to_highlight_min], new["value"][new["value"] == point_to_highlight_min], color="red", ) plt.scatter( new["date"][new["isolation_anomaly_score"] == -1], new["value"][new["isolation_anomaly_score"] == -1], color="black", marker="X", label="anomaly", ) # plt.scatter(new['date'][new['LOF score'] == -1], new['value'][new['LOF score'] == -1], color='yellow',marker='o',label='LOF') plt.scatter( new["date"][new["huge_change"] == 1], new["value"][new["huge_change"] == 1], color="purple", marker="D", label="huge_change", ) # add point values for i in range(len(new["value"])): plt.text(new["date"][i], new["value"][i], new["value"][i]).set_fontsize(4) plt.legend() last_value = new.iloc[-1] logic = "" score = 0 given_value = 100 """ anamoly score total of 10 points # start with tracking broken points logic 3 if less than or more than given value 2 if an anomaly is detected, isolationforest 2 if less than 30% of quantile value 1 huge change i.e more than quantile 1 std value for the hour/day in a week/year 1 lower or equal to 10th percentile """ if last_value.value < given_value: logic += f"Last value is less than {given_value} /" score += 3 if last_value.isolation_anomaly_score == -1: logic += " Anomaly detected in last point. /" score += 2 # value less than 30% of new['quantile'].min() if last_value.value < new["quantile"].min() * 0.3: logic += f" value less than 30% of quantile value of {new['quantile'].min()}. /" score += 2 if last_value.huge_change == 1: logic += f" Huge change detected for last point. /" score += 1 if last_value.value <= new["lower_10th_percentile"].min(): logic += f" value less than 10th percentile. /" score += 1 message = f"Last value is {last_value.value}, for date {last_value.date}. / Anamoly Detection score is {score} out of 10. {logic} " s = message.split("/") plt.tick_params( axis="x", # changes apply to the x-axis which="both", # both major and minor ticks are affected bottom=False, # ticks along the bottom edge are off top=False, # ticks along the top edge are off labelbottom=False, ) # for i in range(len(s)): # plt.gcf().text(1, 1 - (i * 0.1), s[i], fontsize=12) # plt.gcf().text(1, 0.9, message, fontsize=12) # save the plot as png # plt.savefig('./anomaly.png') plt.show()