Skip to content

Instantly share code, notes, and snippets.

@YazanShannak
Created December 30, 2019 14:33
Show Gist options
  • Select an option

  • Save YazanShannak/282ac4ad713255c6105baa208e4571ef to your computer and use it in GitHub Desktop.

Select an option

Save YazanShannak/282ac4ad713255c6105baa208e4571ef to your computer and use it in GitHub Desktop.
Migration code from MongoDB to Elasticsearch + add embeddings from encoder
from pymongo import MongoClient
import math
from tqdm import tqdm
from elasticsearch import Elasticsearch
from keras.models import load_model
import os
import cv2
import numpy as np
import copy
import logging
logging.basicConfig(filename='migration_log.txt', filemode='w')
client = MongoClient('localhost:27017')
db = client['crawled_products']
products = db['products']
count = products.count_documents(filter={})
es = Elasticsearch()
page_size = 50
pages_count = math.ceil(count / page_size)
images_path = os.path.join(os.path.curdir, 'images')
encoder_path = os.path.join(os.path.curdir, 'trials', 'encoder_5.h5')
encoder = load_model(encoder_path)
def add_to_elastic(_id, _doc):
es.index(index='products', id=_id, body=_doc, op_type='create')
class Product:
def __init__(self, mongo_object, images_path):
self.mongo_object = mongo_object
self.data = self.parse_data()
self.images_path = images_path
def parse_data(self):
return {
'name': self.mongo_object['name'],
'category': self.mongo_object['category'],
'price': self.mongo_object['price'],
'vendor': self.mongo_object['vendor'],
'image': self.mongo_object['image_urls'][0],
'mongo_id': str(self.mongo_object['_id'])
}
def get_image_path(self):
return os.path.join(images_path, self.data['category'], self.data['name'] + '.jpg')
def load_image(self, dim=200):
try:
image = (cv2.imread(self.get_image_path(), cv2.COLOR_BGR2RGB) / 255)
return cv2.resize(image, (dim, dim))
except:
return None
def encode_image(self, _encoder, dim=200):
image = np.stack([self.load_image(dim)], axis=0)
return _encoder.predict(image).reshape(-1, 1).flatten().tolist() if image.any() else None
def get_elastic_document(self, _encoder, dim=200):
with_embedding = copy.deepcopy(self.data)
encoded = self.encode_image(_encoder, dim)
if encoded is not None:
with_embedding['embedding'] = encoded
return with_embedding
else:
return None
index = 0
items = list(products.find())
for item in tqdm(items):
_id = (index + 1)
index += 1
p = Product(item, images_path)
doc = p.get_elastic_document(encoder)
if doc is not None:
add_to_elastic(_id, _doc=doc)
logging.debug('Successfully migrated document {}'.format(doc['mongo_id']))
else:
logging.error('Error migrating document {}'.format(p.data.get('mongo_id')))
pass
print(count)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment