db.collection.aggregate(
[
{
"$addFields": {
"indexes": {
"$range": [
0,
{
"$size": "$time_series"
}
]
},
"reversedSeries": {
"$reverseArray": "$time_series"
}
}
},
{
"$project": {
"derivatives": {
"$reverseArray": {
"$slice": [
{
"$map": {
"input": {
"$zip": {
"inputs": [
"$reversedSeries",
"$indexes"
]
}
},
"in": {
"$subtract": [
{
"$arrayElemAt": [
"$$this",
0
]
},
{
"$arrayElemAt": [
"$reversedSeries",
{
"$add": [
{
"$arrayElemAt": [
"$$this",
1
]
},
1
]
}
]
}
]
}
}
},
{
"$subtract": [
{
"$size": "$time_series"
},
1
]
}
]
}
},
"time_series": 1
}
}
]
)
Podemos usar la tubería anterior en la versión 3.4+ para hacer esto. En la tubería, usamos $addFields
etapa de tubería. operador para agregar la matriz del índice de elementos de "time_series" para hacer el documento, también invertimos la matriz de series temporales y la agregamos al documento usando respectivamente $range
y $reverseArray
operadores
Invertimos la matriz aquí porque el elemento en la posición p
en la matriz siempre es mayor que el elemento en la posición p+1
lo que significa que [p] - [p+1] < 0
y no queremos usar el $multiply
aquí. (ver tubería para la versión 3.2)
A continuación, $zipped
los datos de la serie temporal con la matriz de índices y aplicó un substract
expresión a la matriz resultante usando el $map
operador.
Luego $slice
el resultado para descartar el null/None
valor de la matriz y volvió a invertir el resultado.
En 3.2 podemos usar $unwind
operador para relajarse nuestra matriz e incluir el índice de cada elemento en la matriz especificando un documento como operando en lugar de la "ruta" tradicional precedida por $ .
A continuación en la tubería, necesitamos $group
nuestros documentos y use el $push
operador acumulador para devolver una matriz de subdocumentos que se ven así:
{
"_id" : ObjectId("57c11ddbe860bd0b5df6bc64"),
"time_series" : [
{ "value" : 10, "index" : NumberLong(0) },
{ "value" : 20, "index" : NumberLong(1) },
{ "value" : 40, "index" : NumberLong(2) },
{ "value" : 70, "index" : NumberLong(3) },
{ "value" : 110, "index" : NumberLong(4) }
]
}
Finalmente viene el $project
escenario. En esta etapa, necesitamos usar el $map
operador para aplicar una serie de expresiones a cada elemento en la matriz recién calculada en el $group
escenario.
Esto es lo que sucede dentro del $map
(ver $map
como un bucle for) en expresión:
Para cada subdocumento, asignamos el valor campo a una variable usando $let
operador de variables Luego restamos su valor del valor del campo "valor" del siguiente elemento en la matriz.
Dado que el siguiente elemento en la matriz es el elemento en el índice actual más uno, todo lo que necesitamos es la ayuda de $arrayElemAt
operador y un simple $add
ición del índice del elemento actual y 1
.
El $subtract
expresión devuelve un valor negativo, por lo que debemos multiplicar el valor por -1
usando el $multiply
operador.
También necesitamos $filter
la matriz resultante porque el último elemento es None
o null
. La razón es que cuando el elemento actual es el último elemento, $subtract
devolver None
porque el índice del siguiente elemento es igual al tamaño de la matriz.
db.collection.aggregate([
{
"$unwind": {
"path": "$time_series",
"includeArrayIndex": "index"
}
},
{
"$group": {
"_id": "$_id",
"time_series": {
"$push": {
"value": "$time_series",
"index": "$index"
}
}
}
},
{
"$project": {
"time_series": {
"$filter": {
"input": {
"$map": {
"input": "$time_series",
"as": "el",
"in": {
"$multiply": [
{
"$subtract": [
"$$el.value",
{
"$let": {
"vars": {
"nextElement": {
"$arrayElemAt": [
"$time_series",
{
"$add": [
"$$el.index",
1
]
}
]
}
},
"in": "$$nextElement.value"
}
}
]
},
-1
]
}
}
},
"as": "item",
"cond": {
"$gte": [
"$$item",
0
]
}
}
}
}
}
])
Otra opción que creo que es menos eficiente es realizar una operación de mapa/reducción en nuestra colección usando map_reduce
método.
>>> import pymongo
>>> from bson.code import Code
>>> client = pymongo.MongoClient()
>>> db = client.test
>>> collection = db.collection
>>> mapper = Code("""
... function() {
... var derivatives = [];
... for (var index=1; index<this.time_series.length; index++) {
... derivatives.push(this.time_series[index] - this.time_series[index-1]);
... }
... emit(this._id, derivatives);
... }
... """)
>>> reducer = Code("""
... function(key, value) {}
... """)
>>> for res in collection.map_reduce(mapper, reducer, out={'inline': 1})['results']:
... print(res) # or do something with the document.
...
{'value': [10.0, 20.0, 30.0, 40.0], '_id': ObjectId('57c11ddbe860bd0b5df6bc64')}
También puede recuperar todo el documento y usar el numpy.diff
para devolver la derivada así:
import numpy as np
for document in collection.find({}, {'time_series': 1}):
result = np.diff(document['time_series'])