### transfer_mysql_to_mongodb.py
from django.core.management.base import BaseCommand
from django.conf import settings
import pymongo
import pymysql
import decimal
import time
from retry import retry
from datetime import datetime
import os

import logging
logger = logging.getLogger(__name__)
logging_file = os.path.join(settings.LOGGING_FILE_ROOT, 'v1_transfer_mysql_to_mongodb.log')
logging.basicConfig(filename=logging_file, level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')

class Command(BaseCommand):
    help = 'Transfer data from MySQL to MongoDB'
    
    def add_arguments(self, parser):
        ### limit argument
        parser.add_argument('--limit', type=int, nargs='?', default=None, help='Limit the number of records to transfer')
        ### step argument
        parser.add_argument('--step', type=int, nargs='?', default=None, help='Step to execute')
        ### batch_size argument
        parser.add_argument('--batch_size', type=int, nargs='?', default=None, help='batch_size to execute')

    def handle(self, *args, **options):
        #Start time
        start_time = time.time()
        
        ## Get the limit from the command line arguments
        limit = options['limit']
        
        ## Get the step from the command line arguments
        step = options['step']
        
        ## Get the batch_size from the command line arguments
        batch_size = options['batch_size']

        # Define a dictionary to map MySQL tables to MongoDB collections
        table_to_collection = {
            # 'vw_properties': 'v1_properties',
            # 'vw_offices': 'v1_offices',
            # 'vw_agents': 'v1_agents',
            # 'vw_properties_location': 'v1_properties_location',
            # 'vw_features': 'v1_features',
            #'vw_medias': 'v1_medias',
            # 'vw_properties_feature': 'v1_properties_feature',
            #'vw_properties_media': 'v1_properties_media',
            # 'vw_types': 'v1_types',
            # 'vw_usages': 'v1_usages',
            

            'agents': ['v1_mysql_agents','email'],
            'features': ['v1_mysql_features','id'],
            'offices': ['v1_mysql_offices','id'],
            #'properties': ['v1_mysql_properties','listing_id','agent_email','office_id'],
            'types': ['v1_mysql_types','id'],
            'usages': ['v1_mysql_usages','id'],
            #'properties_media': ['v1_mysql_properties_media','listing_id','id'],
            'properties_features': ['v1_mysql_properties_features','listing_id','feature_id'],
            'properties_locations': ['v1_mysql_properties_location','listing_id'],
        }
        
        table_media_to_collection = {
            'properties_media': ['v1_mysql_properties_media','listing_id','id'],
        }
        
        table_properties_to_collection = {
            'properties': ['v1_mysql_properties','listing_id','agent_email','office_id'],
        }

        collection_from_mysql = {
            'v1_mysql_agents_pictures': ["select u.email, u.foto, u.creci from remax_maxcenter.usuarios u WHERE u.foto <>'' group by u.email;","email"],
            'v1_mysql_agents_complements': ["SELECT * FROM `offices_agents_complements` where type='agent' GROUP BY referer_id;","referer_id"],
            'v1_mysql_offices_complements': ["SELECT *, CAST(referer_id AS UNSIGNED) as referer_id FROM `offices_agents_complements` where type='office' GROUP BY referer_id;","referer_id"],
        }
        
        
        
        pipelines = {
            ### Pipeline for agents collection
                'v1_stage_agents': [
                {
                    '$unionWith':
                    {
                        'coll': "v1_mysql_agents",
                    },
                },
                {
                    '$match': {
                        'id': {
                            '$exists': True
                        }
                    }
                },
                {
                    '$lookup': {
                        'from': 'v1_mysql_agents_complements',
                        'localField': 'email',
                        'foreignField': 'referer_id',
                        'as': 'complements_info',
                        'pipeline': [
                            {
                                '$project': {
                                    '_id': 0,
                                    'referer_id': 0,
                                    'type': 0,
                                    'id': 0,
                                    'error': 0,
                                }
                            }
                        ]
                    }
                },
                {
                    '$lookup': {
                        'from': 'v1_mysql_agents_pictures',
                        'localField': 'email',
                        'foreignField': 'email',
                        'as': 'profile_picture_info',
                         'pipeline': [
                                        {
                                        '$project': {
                                                    '_id': 0,
                                                    'foto': 1,
                                                    'email': 1,
                                                    'creci': 1
                                                    }
                                        }
                                    ]
                    }
                },
                {
                    '$unwind': {
                        'path': '$profile_picture_info',
                        'preserveNullAndEmptyArrays': True
                    }
                },
                {
                    '$addFields': {
                        'profile_picture': {
                            '$replaceOne': {
                                'input': '$profile_picture_info.foto',
                                'find': 'http://',
                                'replacement': 'https://'
                            }
                        },
                        'creci': '$profile_picture_info.creci'
                    }
                },
                {
                    '$lookup': {
                        'from': 'v1_mysql_properties',
                        'localField': 'email',
                        'foreignField': 'agent_email',
                        'as': 'property_info',
                        'pipeline': [
                            {
                                '$project': {
                                    'agent_email': 1
                                    }
                            }
                        ]
                    }
                },
                {
                    '$addFields': {
                        'total_properties': {'$size': '$property_info'},
                        'agent_id': {'$toInt': '$id'}
                    }
                },
                {
                    '$group': {
                        '_id': '$agent_id',
                        'docs': {'$first': '$$ROOT'}
                    }
                },
                {
                    '$replaceRoot': {
                    'newRoot': '$docs'
                    }
                },
                {
                    '$project': {
                        'agent_id': 1,
                        'name': 1,
                        'email': 1,
                        'creci': 1,
                        'profile_picture': 1,
                        'total_properties': 1,
                        'office_id': 1,
                        'complements_info': 1
                    }
                }
            ],
            
            ### Pipeline for offices collection
            'v1_stage_offices':  [
                {
                    '$unionWith':
                    {
                        'coll': "v1_mysql_offices",
                    },
                },
                {
                    '$match': {
                        'id': {
                            '$exists': True
                        }
                    }
                },
                {
                    '$lookup': {
                        'from': 'v1_mysql_offices_complements',
                        'localField': 'id',
                        'foreignField': 'referer_id',
                        'as': 'complements_info',
                        'pipeline': [
                            {
                                '$project': {
                                    '_id': 0,
                                    'referer_id': 0,
                                    'type': 0,
                                    'id': 0,
                                    'error': 0,
                                }
                            }
                        ]
                    }
                },
                {
                    '$lookup': {
                        'from': 'v1_mysql_properties',
                        'localField': 'id',
                        'foreignField': 'office_id',
                        'as': 'property_info'
                    }
                },
                {
                    '$addFields': {
                        'total_properties': {'$size': '$property_info'},
                        'office_id': {'$toInt': '$id'},
                        'lng': '$lon',
                    }
                },
                {
                    '$group': {
                        '_id': '$office_id',
                        'docs': {'$first': '$$ROOT'}
                    }
                },
                {
                    '$replaceRoot': {
                    'newRoot': '$docs'
                    }
                },
                {
                    '$project': {
                        'office_id': 1,
                        'name': 1,
                        'logo':1,
                        'website': 1,
                        'telephone': 1,
                        'country': 1,
                        'country_abbreviation': 1,
                        'state': 1,
                        'state_abbreviation': 1,
                        'city': 1,
                        'address': 1,
                        'postal_code': 1,
                        'lat': 1,
                        'lng': 1,
                        'total_properties': 1,
                        'complements_info': 1,
                    }
                }
            ],
            
            ### Pipeline for types collection
            'v1_stage_types':  [
                {
                    '$unionWith':
                    {
                        'coll': "v1_mysql_types",
                    },
                },
                {
                    '$match': {
                        'id': {
                            '$exists': True
                        }
                    }
                },
                {
                    '$addFields': {
                        'type_id': {'$toInt': '$id'},
                        'type_name_en': {'$toString': '$name_en'},
                        'type_name_pt': {'$toString': '$name_pt'}
                    }
                    
                },
                {
                    '$group': {
                        '_id': '$type_id',
                        'docs': {'$first': '$$ROOT'}
                    }
                },
                {
                    '$replaceRoot': {
                    'newRoot': '$docs'
                    }
                },
                {
                    '$project': {
                        'type_id': 1,
                        'type_name_en': 1,
                        'type_name_pt': 1
                    }
                }
            ],
            
            ## Pipeline for usages collection
            'v1_stage_usages':  [
                {
                    '$unionWith':
                    {
                        'coll': "v1_mysql_usages",
                    },
                },
                {
                    '$match': {
                        'id': {
                            '$exists': True
                        }
                    }
                },
                {
                    '$addFields': {
                        'usage_id': {'$toInt': '$id'},
                        'usage_name_en': {'$toString': '$name_en'},
                        'usage_name_pt': {'$toString': '$name_pt'}
                    }
                    
                },
                {
                    '$group': {
                        '_id': '$usage_id',
                        'docs': {'$first': '$$ROOT'}
                    }
                },
                {
                    '$replaceRoot': {
                    'newRoot': '$docs'
                    }
                },
                {
                    '$project': {
                        'usage_id': 1,
                        'usage_name_en': 1,
                        'usage_name_pt': 1
                    }
                }
            ],
            
            ### Pipeline for features collection
            'v1_stage_features':  [
                {
                    '$unionWith':
                    {
                        'coll': "v1_mysql_features",
                    },
                },
                {
                    '$match': {
                        'id': {
                            '$exists': True
                        }
                    }
                },
                {
                    '$addFields': {
                        'feature_id': {'$toInt': '$id'},
                        'feature_name_en': {'$toString': '$name_en'},
                        'feature_name_pt': {'$toString': '$name_pt'}
                    }
                    
                },
                {
                    '$group': {
                        '_id': '$feature_id',
                        'docs': {'$first': '$$ROOT'}
                    }
                },
                {
                    '$replaceRoot': {
                    'newRoot': '$docs'
                    }
                },
                {
                    '$project': {
                        'feature_id': 1,
                        'feature_name_en': 1,
                        'feature_name_pt': 1
                    }
                }
            ],
            
            ### Pipeline for properties_features collection
            'v1_stage_properties_features':  [
                {
                    '$unionWith':
                    {
                        'coll': "v1_mysql_properties_features",
                    },
                },
                {
                    '$match': {
                        'id': {
                            '$exists': True
                        }
                    }
                },
                {
                    '$addFields': {
                        'property_feature_id': {'$toInt': '$id'},
                        'feature_id': {'$toInt': '$item'},
                        'property_id': {
                            '$toLong': {
                                '$replaceAll': {
                                    'input': '$listing_id',
                                    'find': '-',
                                    'replacement': ''
                                }
                            }
                        }
                    }
                },
                {
                    '$group': {
                        '_id': '$property_feature_id',
                        'docs': {'$first': '$$ROOT'}
                    }
                },
                {
                    '$replaceRoot': {
                    'newRoot': '$docs'
                    }
                },
                {
                    '$project': {
                        'property_feature_id': 1,
                        'feature_id': 1,
                        'property_id': 1
                    }
                }
            ],
            
            ### Pipeline for location collection
            'v1_stage_properties_location':  [
                {
                    '$unionWith':
                    {
                        'coll': "v1_mysql_properties_location",
                    },
                },
                {
                    '$match': {
                        'id': {
                            '$exists': True
                        }
                    }
                },
                {
                    '$addFields': {
                        'property_location_id': {'$toInt': '$id'},
                        'lng': '$lon',
                        'property_id': {
                            '$toLong': {
                                '$replaceAll': {
                                    'input': '$listing_id',
                                    'find': '-',
                                    'replacement': ''
                                }
                            }
                        }
                    }
                },
                {
                    '$group': {
                        '_id': '$property_location_id',
                        'docs': {'$first': '$$ROOT'}
                    }
                },
                {
                    '$replaceRoot': {
                    'newRoot': '$docs'
                    }
                },
                {
                    '$project': {
                        'property_location_id': 1,
                        'office_id': 1,
                        'display_address': 1,
                        'country':1,
                        'country_abbreviation':1,
                        'state':1,
                        'city':1,
                        'zone':1,
                        'neighborhood':1,
                        'complement':1,
                        'address':1,
                        'street_number':1,
                        'postal_code':1,
                        'lat':1,
                        'lng':1,
                        'property_id': 1,
                    }
                },
                {
                      '$unset': 'office_id'
  				}
            ]  
            }
        
        pipeline_v1_stage_properties_media = {
                        ### Pipeline for medias collection
            'v1_stage_properties_media':  [
                {
                    '$unionWith':
                    {
                        'coll': "v1_mysql_properties_media",
                    },
                },
                {
                    '$match': {
                        'id': {
                            '$exists': True
                        }
                    }
                },
                {
                    '$addFields': {
                        'property_media_id': {'$toInt': '$id'},
                        'property_id': {
                            '$toLong': {
                                '$replaceAll': {
                                    'input': '$listing_id',
                                    'find': '-',
                                    'replacement': ''
                                }
                            }
                        }
                    }
                },
                {
                    '$group': {
                        '_id': '$property_media_id',
                        'docs': {'$first': '$$ROOT'}
                    }
                },
                {
                    '$replaceRoot': {
                    'newRoot': '$docs'
                    }
                },
                {
                    '$project': {
                        'property_media_id': 1,
                        'type': 1,
                        'main': 1,
                        'url': 1,
                        'property_id': 1
                    }
                }
            ],
        }
        
        pipeline_v1_stage_properties_nested = {
            
            'v1_stage_properties_nested':  [
                    {
                        '$unionWith':
                        {
                            'coll': "v1_mysql_properties",
                        },
                    },
                    {
                        '$match': {
                            'listing_id': {
                                '$exists': True
                            }
                        }
                    },
                    {
                        '$addFields': {
                            'transfer_date': datetime.now(),
                            'property_id': {
                                '$toLong': {
                                    '$replaceAll': {
                                        'input': '$listing_id',
                                        'find': '-',
                                        'replacement': ''
                                    }
                                }
                            }
                        }
                    },
                    {
                        '$lookup':
                        {
                            'from': "v1_stage_types",
                            'localField': "property_type",
                            'foreignField': "type_id",
                            'as': "type",
                            'pipeline': [
                            {
                                '$project': {
                                '_id': 0,
                                },
                            },
                            ],
                        },
                    },
                    {
                        '$lookup':
                        {
                            'from': "v1_stage_usages",
                            'localField': "usage_type",
                            'foreignField': "usage_id",
                            'as': "usage",
                            'pipeline': [
                            {
                                '$project': {
                                '_id': 0,
                                },
                            },
                            ],
                        },
                    },
                    {
                        '$lookup':
                        {
                            'from': "v1_stage_offices",
                            'localField': "office_id",
                            'foreignField': "office_id",
                            'as': "office",
                            'pipeline': [
                            {
                                '$project': {
                                '_id': 0,
                                },
                            },
                            ],
                        },
                    },
                    {
                        '$lookup':
                        {
                            'from': "v1_stage_agents",
                            'localField': "agent_email",
                            'foreignField': "email",
                            'as': "agent",
                            'pipeline': [
                            {
                                '$project': {
                                '_id': 0,
                                },
                            },
                            ],
                        },
                    },
                    {
                        '$lookup':
                        {
                            'from': "v1_stage_properties_location",
                            'localField': "property_id",
                            'foreignField': "property_id",
                            'as': "location",
                            'pipeline': [
                            {
                                '$project': {
                                '_id': 0,
                                },
                            },
                            ],
                        },
                    },
                    ### get all the medias from v1_properties_media of a property_id
                    {
                        '$lookup': {
                            'from': "v1_stage_properties_media",
                            'localField': "property_id",
                            'foreignField': "property_id",
                            'as': "medias",
                            'pipeline': [
                            {
                                '$project': {
                                '_id': 0,
                                'property_id':0,
                                },
                            },
                            ],                            
                        }
                    },
                    {
                        '$lookup': {
                            'from': "v1_stage_properties_features",
                            'localField': "property_id",
                            'foreignField': "property_id",
                            'as': "features",
                             'pipeline': [
                            {
                                '$project': {
                                '_id': 0,
                                'property_id':0
                                },
                            },
                            ],                           
                        }
                    },
                    {
                        '$lookup': {
                            'from': "v1_stage_features",
                            'localField': "features.feature_id",
                            'foreignField': "feature_id",
                            'as': "features",
                            'pipeline': [
                            {
                                '$project': {
                                '_id': 0,
                                },
                            },
                            ],                       
                        }
                    },
                  {
                    '$group': {
                        '_id': '$listing_id',
                        'docs': {'$first': '$$ROOT'}
                    }
                },
                {
                    '$replaceRoot': {
                    'newRoot': '$docs'
                    }
                },
                    {
                      '$unset': 'id'
  					},
                    {
                       '$unset': 'property_type'
  					},
                    {
                       '$unset': 'usage_type'
  					},
                    {
                      '$unset': 'region_id'
  					},
                    {
                      '$unset': 'office_id'
  					},
                    {
                      '$unset': 'office_name'
  					},
                    {
                      '$unset': 'agent_email'
  					}
            ]
        }
        
        
        collection_indexes = {
            'v1_stage_agents': ['email'],
            'v1_stage_offices': ['id'],
            'v1_stage_types': ['id'],
            'v1_stage_usages': ['id'],
            'v1_stage_features': ['id'],
            'v1_stage_properties_media': ['property_id'],
            'v1_stage_properties_features': ['feature_id', 'property_id'],
            'v1_stage_properties_location': ['property_id'],
            'v1_stage_properties_nested': ['property_id']
        }
        
        collection_names_to_be_changed = {
            'v1_stage_agents': 'v1_agents',
            'v1_stage_offices': 'v1_offices',
            'v1_stage_types': 'v1_types',
            'v1_stage_usages': 'v1_usages',
            'v1_stage_features': 'v1_features',
            'v1_stage_properties_media': 'v1_properties_media',
            'v1_stage_properties_features': 'v1_properties_features',
            'v1_stage_properties_location': 'v1_properties_location',
            'v1_stage_properties_nested': 'v1_properties_nested'
        }
        
        
        step_number=1
        if step is None or step == step_number:
            ### Transfer data from MySQL to MongoDB
            self.stdout.write(self.style.WARNING(f'Step {step_number}: Transferring data from MySQL to MongoDB...'))
            try:
                self.transfer_mysql_to_mongodb(table_to_collection, limit, batch_size)
            except Exception as e:
                logger.error(f"Error in step {step_number} transferring data from MySQL to MongoDB: {str(e)}")
        
        step_number=2        
        if step is None or step == step_number:
            ### Transfer data from MySQL to MongoDB
            self.stdout.write(self.style.WARNING(f'Step {step_number}: Transferring media data from MySQL to MongoDB...'))
            try:
                if batch_size is None:
                    batch_size_ = 200000
                else:
                    batch_size_ = batch_size
                self.transfer_mysql_to_mongodb(table_media_to_collection, limit, batch_size_)
            except Exception as e:
                logger.error(f"Error in step {step_number} transferring media data from MySQL to MongoDB: {str(e)}")
                
        step_number=3
        if step is None or step == step_number:
            ### Transfer data from MySQL to MongoDB
            self.stdout.write(self.style.WARNING(f'Step {step_number}: Transferring properties data from MySQL to MongoDB...'))
            try:
                if batch_size is None:
                    batch_size_ = 10000
                else:
                    batch_size_ = batch_size
                self.transfer_mysql_to_mongodb(table_properties_to_collection, limit, batch_size_)
            except Exception as e:
                logger.error(f"Error in step {step_number} transferring media data from MySQL to MongoDB: {str(e)}")

        step_number=4
        if step is None or step == step_number:
            ### Transfer data from MySQL to MongoDB using a command
            self.stdout.write(self.style.WARNING(f'Step {step_number}: Transferring data from MySQL to MongoDB using a mysql query...'))
            try:
                self.transfer_mysql_query_to_mongodb(collection_from_mysql, limit, batch_size)
            except Exception as e:
                logger.error(f"Error in step {step_number} transferring data from MySQL to MongoDB using command: {str(e)}")
        
        step_number=5
        if step is None or step == step_number:
            ### Create a collection from a pipeline (Stage)
            self.stdout.write(self.style.WARNING(f'Step {step_number}: Creating collections from pipelines...'))
            try:
                self.create_collection_from_pipeline(pipelines, collection_indexes, limit, batch_size)
            except Exception as e:
                logger.error(f"Error in step {step_number} creating collection from pipeline: {str(e)}")
         
        step_number=6       
        if step is None or step == step_number:
            ### Create properties_media collection from a pipeline (Stage)
            self.stdout.write(self.style.WARNING(f'Step {step_number}: Creating properties_media collection from pipeline...'))
            try:
                if batch_size is None:
                    batch_size_ = 200000
                else:
                    batch_size_ = batch_size
                self.create_collection_from_pipeline(pipeline_v1_stage_properties_media, collection_indexes, limit, batch_size_)
            except Exception as e:
                logger.error(f"Error in step {step_number} creating properties_media collection from pipeline: {str(e)}")
        
        step_number=7
        if step is None or step == step_number:
            ### Create properties_nested collection from a pipeline (Stage)
            self.stdout.write(self.style.WARNING(f'Step {step_number}: Creating properties_nested collection from pipeline...'))
            try:
                if batch_size is None:
                    batch_size_ = 2000
                else:
                    batch_size_ = batch_size
                self.create_collection_from_pipeline(pipeline_v1_stage_properties_nested, collection_indexes, limit, batch_size_)
            except Exception as e:
                logger.error(f"Error in step {step_number} creating properties_nested collection from pipeline: {str(e)}")
        
        step_number=8
        if step is None or step == step_number:
            ### Change collection names
            self.stdout.write(self.style.WARNING(f'Step {step_number}: Changing collection names...'))
            try:
                self.change_collection_names(collection_names_to_be_changed)
            except Exception as e:
                logger.error(f"Error in step {step_number} changing collection names: {str(e)}")
        
        step_number=9
        if step is None or step == step_number:
            ### Drop Collections from MySql
            self.stdout.write(self.style.WARNING(f'Step {step_number}: Dropping collection that directly came from mysql...'))
            try:
                self.drop_from_mysql_collections([table_to_collection, table_media_to_collection, table_properties_to_collection],[collection_from_mysql])
            except Exception as e:
                logger.error(f"Error in step {step_number} dropping collection: {str(e)}")
                
                
        #End time
        end_time = time.time()
        total_time = round((end_time - start_time),2)
        self.stdout.write(self.style.SUCCESS(f'Total Time: {total_time}'))
        logger.info(f"Data Transfer Completed Successfully at {datetime.now()}. Total Time: {total_time}")



    ### Transfer Methods ###
    
    def transfer_mysql_to_mongodb(self, table_to_collection, limit=None, batch_size=None):
        try:
            start_table_time = time.time()
            
            # Loop through the table-to-collection mapping and transfer data
            for mysql_table, mongo_data in table_to_collection.items():
                mongo_collection_name = mongo_data[0]
                mongo_client, mongo_db = self.connect_to_mongodb()
                mysql_conn = self.connect_to_mysql()
                mysql_cursor = mysql_conn.cursor()
                
                mongo_db.drop_collection(mongo_collection_name)
                
                offset = 0
                total_rows_transferred = 0
                
                while True:
                    query = f"SELECT * FROM {mysql_table}"
                    if limit is not None:
                        query += f" LIMIT {limit}"
                    if batch_size is not None:
                        query += f" LIMIT {batch_size} OFFSET {offset}"
                    mysql_cursor.execute(query)
                    table_data = mysql_cursor.fetchall()
                    if not table_data:
                        break
                    # Fetch column names separately
                    column_names = [desc[0] for desc in mysql_cursor.description]
                    # Create a list to store the converted data
                    converted_data = []
                    
                    # Iterate through each row of data and convert it to a dictionary
                    for row in table_data:
                        row_dict = dict(zip(column_names, row))
                        # Convert Decimal objects to float before inserting into MongoDB
                        for key, value in row_dict.items():
                            if isinstance(value, decimal.Decimal):
                                row_dict[key] = float(value)
                            if key == "property_id":
                                row_dict[key] = int(value)
                        
                        converted_data.append(row_dict)
                    
                    mongo_collection = mongo_db[mongo_collection_name]
                    mongo_collection.insert_many(converted_data)
                    
                    if batch_size is None:
                        break

                    if batch_size is not None:
                        offset += batch_size
                        total_rows_transferred += len(table_data)
                        
                        self.stdout.write(self.style.SUCCESS(f'Batch of {len(table_data)} rows transferred from {mysql_table} to {mongo_collection_name}. Total rows transferred: {total_rows_transferred}'))
                
                indexes = list(mongo_data[1:])
                for index in indexes:
                    mongo_collection.create_index([(index, pymongo.ASCENDING)])
                
                mysql_cursor.close()
                mongo_client.close()
                
                self.stdout.write(self.style.SUCCESS(f'Data transfer from {mysql_table} to {mongo_collection_name} completed successfully!'))
        
        except Exception as e:
            logger.error(f"Error transferring data from {mysql_table} to {mongo_collection_name}: {str(e)}")
            self.stdout.write(self.style.ERROR(f'An error occurred: {str(e)}'))
        
        finally:
            if mysql_cursor:
                mysql_cursor.close()
            if mongo_client:
                mongo_client.close()
            total_table_stage_time = round((time.time() - start_table_time), 2)
            self.stdout.write(self.style.SUCCESS(f'Total Table Stage Time: {total_table_stage_time}'))
    
    
            
    def transfer_mysql_query_to_mongodb(self, collection_from_mysql, limit=None, batch_size=None):
        try:
            start_command_time = time.time()
            
            # Loop through the table-to-collection mapping and transfer data
            for mongo_collection_name, mongo_data in collection_from_mysql.items():
                mysql_query = mongo_data[0]
                mongo_client, mongo_db = self.connect_to_mongodb()
                mysql_conn = self.connect_to_mysql()
                mysql_cursor = mysql_conn.cursor()
                
                mongo_db.drop_collection(mongo_collection_name)
                
                offset = 0
                total_rows_transferred = 0
                
                while True:
                    query = f"{mysql_query}"
                    if limit is not None:
                        query += f" LIMIT {limit}"
                    if batch_size is not None:
                        query += f" LIMIT {batch_size} OFFSET {offset}"
                    mysql_cursor.execute(query)
                    table_data = mysql_cursor.fetchall()

                    if not table_data:
                        break

                    # Fetch column names separately
                    column_names = [desc[0] for desc in mysql_cursor.description]

                    # Create a list to store the converted data
                    converted_data = []

                    # Iterate through each row of data and convert it to a dictionary
                    for row in table_data:
                        row_dict = dict(zip(column_names, row))
                        
                        # Convert Decimal objects to float before inserting into MongoDB
                        for key, value in row_dict.items():
                            if isinstance(value, decimal.Decimal):
                                row_dict[key] = float(value)
                            if key == "property_id":
                                row_dict[key] = int(value)
                        
                        converted_data.append(row_dict)
                    
                    mongo_collection = mongo_db[mongo_collection_name]
                    mongo_collection.insert_many(converted_data)
                    
                    if batch_size is None:
                        break

                    if batch_size is not None:
                        offset += batch_size
                        total_rows_transferred += len(table_data)
                        
                        self.stdout.write(self.style.SUCCESS(f'Batch of {len(table_data)} rows transferred from mysql query to {mongo_collection_name}. Total rows transferred: {total_rows_transferred}'))
                
                indexes = list(mongo_data[1:])
                for index in indexes:
                    mongo_collection.create_index([(index, pymongo.ASCENDING)])
                
                mysql_cursor.close()
                mongo_client.close()
                
                self.stdout.write(self.style.SUCCESS(f'Data transfer from mysql query to {mongo_collection_name} completed successfully!'))
        
        except Exception as e:
            logger.error(f"Error transferring data from mysql query to {mongo_collection_name}: {str(e)}")
            self.stdout.write(self.style.ERROR(f'An error occurred: {str(e)}'))
        
        finally:
            if mysql_cursor:
                mysql_cursor.close()
            if mongo_client:
                mongo_client.close()
            total_command_stage_time = round((time.time() - start_command_time), 2)
            self.stdout.write(self.style.SUCCESS(f'Total MySql query Stage Time: {total_command_stage_time}'))




    def create_collection_from_pipeline(self, pipelines, indexes=None, limit=None, batch_size=None):
        try:
            start_pipeline_time = time.time()

            for mongo_collection_name, pipeline in pipelines.items():
                try:
                    mongo_client, mongo_db = self.connect_to_mongodb()
                    mongo_db.drop_collection(mongo_collection_name)
                    collection = mongo_db[mongo_collection_name]
                    
                    offset = 0
                    total_docs_transferred = 0

                    while True:
                        # Determine the size of the next batch
                        if limit is not None:
                            remaining_docs = limit - total_docs_transferred
                            current_batch_size = min(batch_size, remaining_docs) if batch_size is not None else remaining_docs
                            if remaining_docs <= 0:
                                break
                        else:
                            current_batch_size = batch_size

                        # Add limit and skip stages to the pipeline
                        batch_pipeline = pipeline.copy()
                        if current_batch_size is not None:
                            batch_pipeline.append({'$skip': offset})
                            batch_pipeline.append({'$limit': current_batch_size})

                        if limit is not None or batch_size is not None:
                            # Log the current state of the batch processing
                            self.stdout.write(self.style.SUCCESS(f'Processing batch: skip={offset}, limit={current_batch_size}'))

                        result = list(collection.aggregate(batch_pipeline))

                        if not result:
                            break

                        # Remove '_id' field to avoid duplication issues
                        for doc in result:
                            if '_id' in doc:
                                del doc['_id']

                        collection.insert_many(result)
                        
                        if batch_size is not None or limit is not None:
                            batch_docs_count = len(result)
                            total_docs_transferred += batch_docs_count
                            offset += current_batch_size  # Correctly update the offset for the next batch
                        else:
                            batch_docs_count = len(result)
                            total_docs_transferred = batch_docs_count
                            
                        self.stdout.write(self.style.SUCCESS(f'Batch of {batch_docs_count} documents transferred to {mongo_collection_name}. Total documents transferred: {total_docs_transferred}'))

                        if batch_size is None:
                            break
                    
                    # Creating indexes from collection_indexes
                    if mongo_collection_name in indexes:
                        for index in indexes[mongo_collection_name]:
                            collection.create_index([(index, pymongo.ASCENDING)])
                    
                    mongo_client.close()
                    #time.sleep(10)

                    self.stdout.write(self.style.SUCCESS(f'Collection {mongo_collection_name} successfully created!'))
                    
                except Exception as e:
                    logger.error(f"Error creating collection {mongo_collection_name}: {str(e)}")
                    self.stdout.write(self.style.ERROR(f'An error occurred: {str(e)}'))
        
        except Exception as e:
            logger.error(f"Error creating collection from pipeline: {str(e)}")
            self.stdout.write(self.style.ERROR(f'An error occurred: {str(e)}'))
        
        finally:
            if mongo_client:
                mongo_client.close()
            total_pipeline_time = round((time.time() - start_pipeline_time), 2)
            self.stdout.write(self.style.SUCCESS(f'Total Pipeline Time: {total_pipeline_time}'))



    
    def change_collection_names(self,dict):
        try:
            start_change_time = time.time()
            for old_name, new_name in dict.items():
                mongo_client, mongo_db = self.connect_to_mongodb()
                mongo_db.drop_collection(new_name)
                mongo_db[old_name].rename(new_name)
                mongo_client.close()
                self.stdout.write(self.style.SUCCESS(f'Collection {old_name} successfully renamed to {new_name}!'))
        except Exception as e:
            logger.error(f"Error changing collection names: {str(e)}")
            self.stdout.write(self.style.ERROR(f'An error occurred: {str(e)}'))
        finally:
            if mongo_client:
                mongo_client.close()
            total_change_time = round((time.time() - start_change_time),2)
            self.stdout.write(self.style.SUCCESS(
                    f'Total Change Time: {total_change_time}'
                    )
                )
    
    
    def drop_from_mysql_collections(self,dict1,dict2):
        try:
            start_drop_time = time.time()
            for table_to_collection in dict1:
                for mongo_data in table_to_collection.values():
                    mongo_client, mongo_db = self.connect_to_mongodb()
                    mongo_collection_name = mongo_data[0]
                    mongo_db.drop_collection(mongo_collection_name)
                    mongo_client.close()
                    self.stdout.write(self.style.SUCCESS(f'Collection {mongo_collection_name} successfully droped!'))
            for collection_from_mysql in dict2:
                for mongo_collection_name in collection_from_mysql.keys():
                    mongo_client, mongo_db = self.connect_to_mongodb()
                    mongo_db.drop_collection(mongo_collection_name)
                    mongo_client.close()
                    self.stdout.write(self.style.SUCCESS(f'Collection {mongo_collection_name} successfully droped!'))
        except Exception as e:
            logger.error(f"Error dropping collections: {str(e)}")
            self.stdout.write(self.style.ERROR(f'An error occurred: {str(e)}'))
        finally:
            if mongo_client:
                mongo_client.close()
            total_drop_time = round((time.time() - start_drop_time),2)
            self.stdout.write(self.style.SUCCESS(
                    f'Total Drop Time: {total_drop_time}'
                    )
                )
        

    @retry(pymysql.OperationalError, delay=5, tries=3)
    def connect_to_mysql(self):
        # Connect to MySQL using pymysql
        mysql_settings = settings.DATABASES['mysql']
        mysql_conn = pymysql.connect(
                    host=mysql_settings['HOST'],
                    port=mysql_settings['PORT'],
                    user=mysql_settings['USER'],
                    password=mysql_settings['PASSWORD'],
                    db=mysql_settings['NAME'],
                )
        return mysql_conn
    
    @retry(pymongo.errors.ServerSelectionTimeoutError, delay=5, tries=3)
    def connect_to_mongodb(self):
        # Get MongoDB configuration from settings.py
        mongo_settings = settings.DATABASES['mongodb']
        
        # Configure MongoDB connection using settings
        mongo_client = pymongo.MongoClient(
                    host=mongo_settings['CLIENT']['host'],
                    port=mongo_settings['CLIENT']['port'],
                    username=mongo_settings['CLIENT']['username'],
                    password=mongo_settings['CLIENT']['password'],
                    authSource=mongo_settings['CLIENT']['authSource'],
                    authMechanism=mongo_settings['CLIENT']['authMechanism'],
                )
        return mongo_client, mongo_client[mongo_settings['NAME']]
    
    ##	/home/remax/virtualenv/api/3.8/bin/python3 /home/remax/api/PortalREMax/manage.py v1_transfer_mysql_to_mongodb 5000 >> /home/remax/logs/v1_transfer_mysql_to_mongodb.log 2>&1