-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathDeepSeekAgroMLSpark.txt
More file actions
218 lines (172 loc) · 8.25 KB
/
DeepSeekAgroMLSpark.txt
File metadata and controls
218 lines (172 loc) · 8.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
13/04/25 13:26:12
# Modelo de Predição de Risco de Crédito para Agronegócio em PySpark
## Visão Geral
Este modelo utiliza PySpark para avaliar riscos na concessão de crédito para operações do agronegócio, considerando variáveis como histórico de safras, condições climáticas, preços de commodities e índices de inadimplência.
## Estrutura do Projeto
```python
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import col, when
## 1. Inicialização da Sessão Spark
spark = SparkSession.builder \
.appName("AgroCreditRiskModel") \
.config("spark.executor.memory", "4g") \
.config("spark.driver.memory", "2g") \
.getOrCreate()
## 2. Carregamento e Pré-processamento dos Dados
def load_and_preprocess_data(file_path):
# Schema explícito para melhor performance
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, DateType
schema = StructType([
StructField("operacao_id", StringType(), True),
StructField("data_operacao", DateType(), True),
StructField("tipo_cultura", StringType(), True),
StructField("regiao", StringType(), True),
StructField("area_hectares", DoubleType(), True),
StructField("produtividade_historica", DoubleType(), True),
StructField("preco_commodity_historico", DoubleType(), True),
StructField("indice_chuva", DoubleType(), True),
StructField("temperatura_media", DoubleType(), True),
StructField("score_solo", DoubleType(), True),
StructField("nivel_tecnologia", IntegerType(), True),
StructField("endividamento", DoubleType(), True),
StructField("inadimplencia_historica", DoubleType(), True),
StructField("risco_credito", IntegerType(), True) # 0 = baixo, 1 = alto
])
# Carrega os dados
df = spark.read.csv(file_path, schema=schema, header=True)
# Limpeza de dados
df = df.na.fill({
'produtividade_historica': df.selectExpr("percentile(produtividade_historica, 0.5)").first()[0],
'indice_chuva': df.selectExpr("avg(indice_chuva)").first()[0],
'temperatura_media': df.selectExpr("avg(temperatura_media)").first()[0],
'score_solo': 5.0 # Valor médio assumido para score do solo
})
# Engenharia de features
df = df.withColumn("estacao",
when((col("data_operacao").between("2020-12-21", "2021-03-20")) |
(col("data_operacao").between("2019-12-21", "2020-03-20")), "verao")
.when((col("data_operacao").between("2020-03-21", "2020-06-20")) |
(col("data_operacao").between("2019-03-21", "2019-06-20")), "outono")
.when((col("data_operacao").between("2020-06-21", "2020-09-20")) |
(col("data_operacao").between("2019-06-21", "2019-09-20")), "inverno")
.otherwise("primavera"))
return df
## 3. Pipeline de Machine Learning
def build_ml_pipeline():
# Transformações de categoria
tipo_cultura_indexer = StringIndexer(inputCol="tipo_cultura", outputCol="tipo_cultura_index")
regiao_indexer = StringIndexer(inputCol="regiao", outputCol="regiao_index")
estacao_indexer = StringIndexer(inputCol="estacao", outputCol="estacao_index")
# One-hot encoding
encoder = OneHotEncoder(inputCols=["tipo_cultura_index", "regiao_index", "estacao_index"],
outputCols=["tipo_cultura_vec", "regiao_vec", "estacao_vec"])
# Assembler para features numéricas e categóricas
feature_cols = ["area_hectares", "produtividade_historica", "preco_commodity_historico",
"indice_chuva", "temperatura_media", "score_solo", "nivel_tecnologia",
"endividamento", "inadimplencia_historica", "tipo_cultura_vec",
"regiao_vec", "estacao_vec"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
# Normalização
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
# Modelo de Random Forest
rf = RandomForestClassifier(featuresCol="scaledFeatures", labelCol="risco_credito",
numTrees=100, maxDepth=10, seed=42)
# Pipeline completo
pipeline = Pipeline(stages=[
tipo_cultura_indexer,
regiao_indexer,
estacao_indexer,
encoder,
assembler,
scaler,
rf
])
return pipeline
## 4. Treinamento e Avaliação do Modelo
def train_and_evaluate(df, pipeline):
# Split dos dados
train, test = df.randomSplit([0.8, 0.2], seed=42)
# Grid de parâmetros para tuning
paramGrid = (ParamGridBuilder()
.addGrid(RandomForestClassifier.numTrees, [50, 100, 150])
.addGrid(RandomForestClassifier.maxDepth, [5, 10, 15])
.addGrid(RandomForestClassifier.impurity, ["gini", "entropy"])
.build())
# Avaliador
evaluator = BinaryClassificationEvaluator(labelCol="risco_credito",
rawPredictionCol="rawPrediction",
metricName="areaUnderROC")
# Cross-validation
cv = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
numFolds=5,
seed=42)
# Treinamento do modelo
cv_model = cv.fit(train)
# Avaliação
train_predictions = cv_model.transform(train)
test_predictions = cv_model.transform(test)
train_auc = evaluator.evaluate(train_predictions)
test_auc = evaluator.evaluate(test_predictions)
print(f"AUC - Treino: {train_auc:.4f}, Teste: {test_auc:.4f}")
return cv_model
## 5. Função Principal
def main():
# Carrega os dados
data_path = "hdfs://path/to/agro_credit_data.csv" # ou caminho local
agro_df = load_and_preprocess_data(data_path)
# Constrói o pipeline
pipeline = build_ml_pipeline()
# Treina e avalia o modelo
model = train_and_evaluate(agro_df, pipeline)
# Salva o modelo para produção
model.save("hdfs://path/to/models/agro_credit_risk_model")
# Exemplo de predição
example_data = spark.createDataFrame([
("OP123", "2023-03-15", "Soja", "Centro-Oeste", 100.0, 60.0, 150.0,
1200.0, 24.5, 6.0, 3, 0.4, 0.05, None)
], agro_df.schema)
prediction = model.transform(example_data)
prediction.select("operacao_id", "prediction", "probability").show(truncate=False)
if __name__ == "__main__":
main()
```
## Variáveis Explicativas do Modelo
1. **Dados Históricos de Safra**:
- Produtividade histórica (ton/ha)
- Tipo de cultura (soja, milho, café, etc.)
- Área cultivada (hectares)
2. **Fatores Climáticos**:
- Índice pluviométrico (mm)
- Temperatura média (°C)
- Estação do ano
3. **Indicadores Econômicos**:
- Preço histórico da commodity (R$/sc)
- Endividamento do produtor (%)
- Nível de tecnologia aplicada (escala 1-5)
4. **Risco de Crédito**:
- Histórico de inadimplência (%)
- Score de qualidade do solo
## Melhorias Possíveis
1. **Integração com APIs externas**:
- Dados climáticos em tempo real
- Cotações de commodities atualizadas
2. **Feature Engineering Avançado**:
- Médias móveis de preços de commodities
- Tendências sazonais de produtividade
- Índices de vegetação (NDVI) via imagens de satélite
3. **Modelos Alternativos**:
- Gradient Boosted Trees (GBT)
- Modelos de ensemble
- Deep Learning para séries temporais
4. **Monitoramento Contínuo**:
- Pipeline de retreinamento automático
- Alertas para drift de dados
- Painel de performance do modelo
Este modelo fornece uma base robusta para avaliação de risco de crédito no agronegócio, podendo ser adaptado para diferentes culturas e regiões com ajustes no conjunto de dados e parâmetros do modelo.