跳到主要内容

数据库集成✅

本主题涵盖Node.js与各种数据库的集成方案,包括关系型数据库、NoSQL数据库的连接和操作方法。

Node.js如何连接和操作MySQL数据库?

答案

核心概念

Node.js通过数据库驱动程序连接MySQL,常用的驱动包括mysql2mysql等。在生产环境中,通常使用连接池来管理数据库连接,提升性能和并发处理能力。

连接方式

1. 基础连接

const mysql = require('mysql2/promise');

const connection = await mysql.createConnection({
host: 'localhost',
user: 'root',
password: 'password',
database: 'mydb'
});

2. 连接池管理

const pool = mysql.createPool({
host: 'localhost',
user: 'root',
password: 'password',
database: 'mydb',
connectionLimit: 10,
acquireTimeout: 60000,
reconnect: true
});

3. 事务处理

const connection = await pool.getConnection();
await connection.beginTransaction();
try {
await connection.execute('INSERT INTO users ...');
await connection.execute('INSERT INTO profiles ...');
await connection.commit();
} catch (error) {
await connection.rollback();
throw error;
} finally {
connection.release();
}

示例实现:

// Node.js MySQL数据库连接和操作演示
const mysql = require('mysql2/promise');
const { EventEmitter } = require('events');

/**
 * 1. MySQL连接池管理器
 */
class MySQLConnectionManager {
  constructor(config = {}) {
    this.config = {
      host: config.host || 'localhost',
      port: config.port || 3306,
      user: config.user || 'root',
      password: config.password || 'password',
      database: config.database || 'test_db',
      connectionLimit: config.connectionLimit || 10,
      acquireTimeout: config.acquireTimeout || 60000,
      reconnect: config.reconnect !== false,
      ...config
    };
    
    this.pool = null;
    this.isConnected = false;
  }

  // 创建连接池
  async createPool() {
    try {
      this.pool = mysql.createPool({
        ...this.config,
        waitForConnections: true,
        queueLimit: 0
      });

      console.log('MySQL连接池创建成功');
      this.isConnected = true;
      
      // 测试连接
      await this.testConnection();
      return this.pool;
    } catch (error) {
      console.error('创建MySQL连接池失败:', error);
      throw error;
    }
  }

  // 测试连接
  async testConnection() {
    try {
      const connection = await this.pool.getConnection();
      await connection.ping();
      connection.release();
      console.log('MySQL连接测试成功');
      return true;
    } catch (error) {
      console.error('MySQL连接测试失败:', error);
      return false;
    }
  }

  // 执行查询
  async query(sql, params = []) {
    if (!this.pool) {
      throw new Error('连接池未初始化');
    }

    try {
      const [rows, fields] = await this.pool.execute(sql, params);
      return { rows, fields };
    } catch (error) {
      console.error('SQL执行错误:', error);
      throw error;
    }
  }

  // 开始事务
  async beginTransaction() {
    const connection = await this.pool.getConnection();
    await connection.beginTransaction();
    
    return {
      async query(sql, params) {
        const [rows] = await connection.execute(sql, params);
        return rows;
      },
      
      async commit() {
        await connection.commit();
        connection.release();
      },
      
      async rollback() {
        await connection.rollback();
        connection.release();
      }
    };
  }

  // 关闭连接池
  async close() {
    if (this.pool) {
      await this.pool.end();
      this.isConnected = false;
      console.log('MySQL连接池已关闭');
    }
  }

  // 获取连接池状态
  getPoolStatus() {
    if (!this.pool) {
      return { status: '未连接' };
    }

    return {
      totalConnections: this.pool._allConnections.length,
      freeConnections: this.pool._freeConnections.length,
      connectionLimit: this.config.connectionLimit,
      acquireTimeout: this.config.acquireTimeout
    };
  }
}

/**
 * 2. 数据访问对象(DAO)模式实现
 */
class UserDAO {
  constructor(connectionManager) {
    this.db = connectionManager;
  }

  // 创建用户表
  async createTable() {
    const sql = `
      CREATE TABLE IF NOT EXISTS users (
        id INT AUTO_INCREMENT PRIMARY KEY,
        username VARCHAR(50) NOT NULL UNIQUE,
        email VARCHAR(100) NOT NULL UNIQUE,
        password VARCHAR(255) NOT NULL,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
      )
    `;
    
    await this.db.query(sql);
    console.log('用户表创建成功');
  }

  // 创建用户
  async createUser(userData) {
    const { username, email, password } = userData;
    const sql = `
      INSERT INTO users (username, email, password)
      VALUES (?, ?, ?)
    `;
    
    const result = await this.db.query(sql, [username, email, password]);
    return {
      id: result.rows.insertId,
      username,
      email,
      created_at: new Date()
    };
  }

  // 根据ID查找用户
  async findById(id) {
    const sql = 'SELECT * FROM users WHERE id = ?';
    const result = await this.db.query(sql, [id]);
    return result.rows[0] || null;
  }

  // 根据用户名查找用户
  async findByUsername(username) {
    const sql = 'SELECT * FROM users WHERE username = ?';
    const result = await this.db.query(sql, [username]);
    return result.rows[0] || null;
  }

  // 获取所有用户
  async findAll(limit = 10, offset = 0) {
    const sql = 'SELECT id, username, email, created_at FROM users LIMIT ? OFFSET ?';
    const result = await this.db.query(sql, [limit, offset]);
    return result.rows;
  }

  // 更新用户
  async updateUser(id, updateData) {
    const fields = [];
    const values = [];
    
    for (const [key, value] of Object.entries(updateData)) {
      if (key !== 'id') {
        fields.push(`${key} = ?`);
        values.push(value);
      }
    }
    
    if (fields.length === 0) {
      throw new Error('没有要更新的字段');
    }
    
    values.push(id);
    const sql = `UPDATE users SET ${fields.join(', ')} WHERE id = ?`;
    
    const result = await this.db.query(sql, values);
    return result.rows.affectedRows > 0;
  }

  // 删除用户
  async deleteUser(id) {
    const sql = 'DELETE FROM users WHERE id = ?';
    const result = await this.db.query(sql, [id]);
    return result.rows.affectedRows > 0;
  }

  // 用户统计
  async getUserStats() {
    const sql = `
      SELECT 
        COUNT(*) as total_users,
        COUNT(CASE WHEN created_at >= DATE_SUB(NOW(), INTERVAL 30 DAY) THEN 1 END) as new_users_30_days,
        MIN(created_at) as first_user_date,
        MAX(created_at) as latest_user_date
      FROM users
    `;
    
    const result = await this.db.query(sql);
    return result.rows[0];
  }
}

/**
 * 3. 数据库迁移管理器
 */
class MigrationManager {
  constructor(connectionManager) {
    this.db = connectionManager;
    this.migrations = [];
  }

  // 创建迁移表
  async createMigrationsTable() {
    const sql = `
      CREATE TABLE IF NOT EXISTS migrations (
        id INT AUTO_INCREMENT PRIMARY KEY,
        name VARCHAR(255) NOT NULL,
        executed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
      )
    `;
    
    await this.db.query(sql);
    console.log('迁移表创建成功');
  }

  // 添加迁移
  addMigration(name, upFunction, downFunction) {
    this.migrations.push({
      name,
      up: upFunction,
      down: downFunction
    });
  }

  // 检查迁移是否已执行
  async isMigrationExecuted(name) {
    const sql = 'SELECT COUNT(*) as count FROM migrations WHERE name = ?';
    const result = await this.db.query(sql, [name]);
    return result.rows[0].count > 0;
  }

  // 执行迁移
  async runMigrations() {
    await this.createMigrationsTable();
    
    for (const migration of this.migrations) {
      const isExecuted = await this.isMigrationExecuted(migration.name);
      
      if (!isExecuted) {
        console.log(`执行迁移: ${migration.name}`);
        
        try {
          await migration.up(this.db);
          
          // 记录迁移执行
          const sql = 'INSERT INTO migrations (name) VALUES (?)';
          await this.db.query(sql, [migration.name]);
          
          console.log(`迁移 ${migration.name} 执行成功`);
        } catch (error) {
          console.error(`迁移 ${migration.name} 执行失败:`, error);
          throw error;
        }
      } else {
        console.log(`迁移 ${migration.name} 已执行,跳过`);
      }
    }
  }

  // 回滚迁移
  async rollbackMigration(name) {
    const migration = this.migrations.find(m => m.name === name);
    if (!migration) {
      throw new Error(`迁移 ${name} 不存在`);
    }

    const isExecuted = await this.isMigrationExecuted(name);
    if (!isExecuted) {
      console.log(`迁移 ${name} 尚未执行,无需回滚`);
      return;
    }

    try {
      await migration.down(this.db);
      
      // 删除迁移记录
      const sql = 'DELETE FROM migrations WHERE name = ?';
      await this.db.query(sql, [name]);
      
      console.log(`迁移 ${name} 回滚成功`);
    } catch (error) {
      console.error(`迁移 ${name} 回滚失败:`, error);
      throw error;
    }
  }
}

/**
 * 4. 查询构建器
 */
class QueryBuilder {
  constructor(table, db) {
    this.table = table;
    this.db = db;
    this.reset();
  }

  reset() {
    this._select = '*';
    this._where = [];
    this._joins = [];
    this._orderBy = [];
    this._groupBy = [];
    this._having = [];
    this._limit = null;
    this._offset = null;
    this._params = [];
    return this;
  }

  select(columns) {
    this._select = Array.isArray(columns) ? columns.join(', ') : columns;
    return this;
  }

  where(column, operator, value) {
    if (arguments.length === 2) {
      value = operator;
      operator = '=';
    }
    
    this._where.push(`${column} ${operator} ?`);
    this._params.push(value);
    return this;
  }

  whereIn(column, values) {
    const placeholders = values.map(() => '?').join(', ');
    this._where.push(`${column} IN (${placeholders})`);
    this._params.push(...values);
    return this;
  }

  join(table, condition) {
    this._joins.push(`JOIN ${table} ON ${condition}`);
    return this;
  }

  leftJoin(table, condition) {
    this._joins.push(`LEFT JOIN ${table} ON ${condition}`);
    return this;
  }

  orderBy(column, direction = 'ASC') {
    this._orderBy.push(`${column} ${direction}`);
    return this;
  }

  groupBy(column) {
    this._groupBy.push(column);
    return this;
  }

  having(condition) {
    this._having.push(condition);
    return this;
  }

  limit(count) {
    this._limit = count;
    return this;
  }

  offset(count) {
    this._offset = count;
    return this;
  }

  // 构建查询SQL
  buildQuery() {
    let sql = `SELECT ${this._select} FROM ${this.table}`;
    
    if (this._joins.length > 0) {
      sql += ' ' + this._joins.join(' ');
    }
    
    if (this._where.length > 0) {
      sql += ' WHERE ' + this._where.join(' AND ');
    }
    
    if (this._groupBy.length > 0) {
      sql += ' GROUP BY ' + this._groupBy.join(', ');
    }
    
    if (this._having.length > 0) {
      sql += ' HAVING ' + this._having.join(' AND ');
    }
    
    if (this._orderBy.length > 0) {
      sql += ' ORDER BY ' + this._orderBy.join(', ');
    }
    
    if (this._limit !== null) {
      sql += ` LIMIT ${this._limit}`;
    }
    
    if (this._offset !== null) {
      sql += ` OFFSET ${this._offset}`;
    }
    
    return { sql, params: this._params };
  }

  // 执行查询
  async get() {
    const { sql, params } = this.buildQuery();
    const result = await this.db.query(sql, params);
    this.reset();
    return result.rows;
  }

  // 获取第一条记录
  async first() {
    const results = await this.limit(1).get();
    return results[0] || null;
  }

  // 获取记录数
  async count() {
    this._select = 'COUNT(*) as count';
    const result = await this.first();
    return result ? result.count : 0;
  }
}

/**
 * 演示用法
 */
async function demonstrateMySQLOperations() {
  console.log('=== Node.js MySQL操作演示 ===\n');

  // 由于演示环境限制,我们使用模拟的方式
  console.log('注意: 这是一个模拟演示,实际使用需要真实的MySQL数据库\n');

  // 1. 连接管理演示
  console.log('1. 连接池管理演示:');
  const dbConfig = {
    host: 'localhost',
    user: 'root',
    password: 'password',
    database: 'demo_db',
    connectionLimit: 10
  };
  
  const db = new MySQLConnectionManager(dbConfig);
  console.log('MySQL连接配置:', dbConfig);
  console.log('连接池状态: 未连接\n');

  // 2. DAO模式演示
  console.log('2. DAO模式操作示例:');
  const userDAO = new UserDAO(db);
  
  console.log('创建用户示例:');
  const newUser = {
    username: 'john_doe',
    email: 'john@example.com',
    password: 'hashed_password'
  };
  console.log('用户数据:', newUser);
  
  console.log('查询用户示例:');
  console.log('SQL: SELECT * FROM users WHERE username = ?');
  console.log('参数: ["john_doe"]');
  
  console.log('更新用户示例:');
  console.log('SQL: UPDATE users SET email = ? WHERE id = ?');
  console.log('参数: ["newemail@example.com", 1]\n');

  // 3. 查询构建器演示
  console.log('3. 查询构建器示例:');
  const qb = new QueryBuilder('users', db);
  
  const query1 = qb
    .select(['id', 'username', 'email'])
    .where('status', 'active')
    .where('created_at', '>', '2023-01-01')
    .orderBy('created_at', 'DESC')
    .limit(10)
    .buildQuery();
  
  console.log('构建的查询:');
  console.log('SQL:', query1.sql);
  console.log('参数:', query1.params);
  
  const query2 = new QueryBuilder('users', db)
    .select('users.*, profiles.avatar')
    .leftJoin('profiles', 'users.id = profiles.user_id')
    .whereIn('users.id', [1, 2, 3])
    .buildQuery();
  
  console.log('\nJOIN查询示例:');
  console.log('SQL:', query2.sql);
  console.log('参数:', query2.params);

  // 4. 事务示例
  console.log('\n4. 事务操作示例:');
  console.log('开始事务 -> 插入用户 -> 插入用户资料 -> 提交事务');
  console.log(`
    const transaction = await db.beginTransaction();
    try {
      await transaction.query('INSERT INTO users ...', params1);
      await transaction.query('INSERT INTO profiles ...', params2);
      await transaction.commit();
    } catch (error) {
      await transaction.rollback();
      throw error;
    }
  `);

  // 5. 迁移管理示例
  console.log('\n5. 数据库迁移示例:');
  const migration = new MigrationManager(db);
  
  migration.addMigration('001_create_users_table', 
    // UP
    async (db) => {
      await db.query(`
        CREATE TABLE users (
          id INT AUTO_INCREMENT PRIMARY KEY,
          username VARCHAR(50) UNIQUE,
          email VARCHAR(100) UNIQUE,
          created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
      `);
    },
    // DOWN  
    async (db) => {
      await db.query('DROP TABLE users');
    }
  );
  
  console.log('迁移已添加: 001_create_users_table');
  console.log('执行: migration.runMigrations()');
  console.log('回滚: migration.rollbackMigration("001_create_users_table")');

  console.log('\n演示完成!实际项目中需要连接真实的MySQL数据库。');
}

// 如果直接运行此文件
if (require.main === module) {
  demonstrateMySQLOperations().catch(console.error);
}

module.exports = {
  MySQLConnectionManager,
  UserDAO,
  MigrationManager,
  QueryBuilder,
  demonstrateMySQLOperations
};

数据访问模式:

DAO模式(Data Access Object):

  • 封装数据库操作逻辑
  • 提供统一的数据访问接口
  • 便于单元测试和维护

查询构建器:

  • 动态构建SQL查询
  • 类型安全的查询接口
  • 支持复杂的查询条件

面试官视角:

该题考察候选人对Node.js数据库集成的理解:

  • 要点清单: 掌握数据库驱动使用;理解连接池原理;能处理事务操作;了解安全防护
  • 加分项: 有生产环境经验;了解性能优化;掌握数据库设计;能处理并发问题
  • 常见失误: 不理解连接池;忽视SQL注入;缺乏事务处理;不考虑错误处理

延伸阅读:

Node.js如何操作MongoDB数据库?

答案

核心概念

MongoDB是面向文档的NoSQL数据库,Node.js通过官方驱动mongodb或ODM框架Mongoose进行操作。MongoDB的文档结构灵活,支持复杂的数据类型和嵌套结构。

连接和基础操作

1. 官方驱动连接

const { MongoClient } = require('mongodb');

const client = new MongoClient(uri, {
maxPoolSize: 10,
serverSelectionTimeoutMS: 5000
});

await client.connect();
const db = client.db('myapp');
const collection = db.collection('users');

2. 文档操作

// 创建
await collection.insertOne(document);
await collection.insertMany(documents);

// 查询
await collection.findOne({ _id: objectId });
await collection.find(filter).toArray();

// 更新
await collection.updateOne(filter, { $set: update });
await collection.updateMany(filter, { $set: update });

// 删除
await collection.deleteOne(filter);
await collection.deleteMany(filter);

3. 聚合查询

const pipeline = [
{ $match: { status: 'active' } },
{ $group: { _id: '$category', count: { $sum: 1 } } },
{ $sort: { count: -1 } }
];

await collection.aggregate(pipeline).toArray();

示例实现:

// Node.js MongoDB数据库操作演示
const { MongoClient, ObjectId } = require('mongodb');
const { EventEmitter } = require('events');

/**
 * 1. MongoDB连接管理器
 */
class MongoDBConnectionManager extends EventEmitter {
  constructor(options = {}) {
    super();
    this.uri = options.uri || 'mongodb://localhost:27017';
    this.dbName = options.dbName || 'demo_db';
    this.options = {
      maxPoolSize: options.maxPoolSize || 10,
      serverSelectionTimeoutMS: options.serverSelectionTimeoutMS || 5000,
      socketTimeoutMS: options.socketTimeoutMS || 45000,
      ...options
    };
    
    this.client = null;
    this.db = null;
    this.isConnected = false;
  }

  // 连接数据库
  async connect() {
    try {
      console.log(`连接MongoDB: ${this.uri}/${this.dbName}`);
      
      this.client = new MongoClient(this.uri, this.options);
      await this.client.connect();
      
      this.db = this.client.db(this.dbName);
      this.isConnected = true;
      
      console.log('MongoDB连接成功');
      this.emit('connected');
      
      // 监听连接事件
      this.setupEventListeners();
      
      return this.db;
    } catch (error) {
      console.error('MongoDB连接失败:', error);
      this.emit('error', error);
      throw error;
    }
  }

  // 设置事件监听器
  setupEventListeners() {
    this.client.on('close', () => {
      console.log('MongoDB连接关闭');
      this.isConnected = false;
      this.emit('disconnected');
    });

    this.client.on('error', (error) => {
      console.error('MongoDB连接错误:', error);
      this.emit('error', error);
    });

    this.client.on('reconnect', () => {
      console.log('MongoDB重连成功');
      this.isConnected = true;
      this.emit('reconnected');
    });
  }

  // 获取集合
  collection(name) {
    if (!this.isConnected || !this.db) {
      throw new Error('数据库未连接');
    }
    return this.db.collection(name);
  }

  // 关闭连接
  async close() {
    if (this.client) {
      await this.client.close();
      this.isConnected = false;
      console.log('MongoDB连接已关闭');
    }
  }

  // 获取数据库统计信息
  async getStats() {
    if (!this.isConnected || !this.db) {
      throw new Error('数据库未连接');
    }

    try {
      const stats = await this.db.stats();
      return {
        database: this.dbName,
        collections: stats.collections,
        objects: stats.objects,
        dataSize: stats.dataSize,
        storageSize: stats.storageSize,
        indexes: stats.indexes,
        indexSize: stats.indexSize
      };
    } catch (error) {
      console.error('获取数据库统计信息失败:', error);
      throw error;
    }
  }

  // 测试连接
  async ping() {
    if (!this.isConnected || !this.db) {
      throw new Error('数据库未连接');
    }

    try {
      await this.db.admin().ping();
      return true;
    } catch (error) {
      console.error('数据库ping失败:', error);
      return false;
    }
  }
}

/**
 * 2. MongoDB模型基类
 */
class BaseModel {
  constructor(collectionName, db) {
    this.collectionName = collectionName;
    this.db = db;
    this.collection = db.collection(collectionName);
  }

  // 创建文档
  async create(document) {
    try {
      const result = await this.collection.insertOne({
        ...document,
        createdAt: new Date(),
        updatedAt: new Date()
      });
      
      return {
        _id: result.insertedId,
        ...document,
        createdAt: new Date(),
        updatedAt: new Date()
      };
    } catch (error) {
      console.error('创建文档失败:', error);
      throw error;
    }
  }

  // 批量创建
  async createMany(documents) {
    try {
      const now = new Date();
      const documentsWithTimestamps = documents.map(doc => ({
        ...doc,
        createdAt: now,
        updatedAt: now
      }));
      
      const result = await this.collection.insertMany(documentsWithTimestamps);
      return result.insertedIds;
    } catch (error) {
      console.error('批量创建文档失败:', error);
      throw error;
    }
  }

  // 根据ID查找
  async findById(id) {
    try {
      return await this.collection.findOne({ _id: new ObjectId(id) });
    } catch (error) {
      console.error('根据ID查找文档失败:', error);
      throw error;
    }
  }

  // 查找单个文档
  async findOne(filter = {}, options = {}) {
    try {
      return await this.collection.findOne(filter, options);
    } catch (error) {
      console.error('查找单个文档失败:', error);
      throw error;
    }
  }

  // 查找多个文档
  async find(filter = {}, options = {}) {
    try {
      return await this.collection.find(filter, options).toArray();
    } catch (error) {
      console.error('查找文档失败:', error);
      throw error;
    }
  }

  // 分页查找
  async findPaginated(filter = {}, options = {}) {
    const { page = 1, limit = 10, sort = { createdAt: -1 } } = options;
    const skip = (page - 1) * limit;

    try {
      const [documents, total] = await Promise.all([
        this.collection.find(filter)
          .sort(sort)
          .skip(skip)
          .limit(limit)
          .toArray(),
        this.collection.countDocuments(filter)
      ]);

      return {
        documents,
        pagination: {
          page,
          limit,
          total,
          pages: Math.ceil(total / limit),
          hasNext: page * limit < total,
          hasPrev: page > 1
        }
      };
    } catch (error) {
      console.error('分页查找失败:', error);
      throw error;
    }
  }

  // 更新文档
  async updateById(id, update) {
    try {
      const result = await this.collection.updateOne(
        { _id: new ObjectId(id) },
        { 
          $set: { 
            ...update, 
            updatedAt: new Date() 
          } 
        }
      );
      
      return result.modifiedCount > 0;
    } catch (error) {
      console.error('根据ID更新文档失败:', error);
      throw error;
    }
  }

  // 批量更新
  async updateMany(filter, update) {
    try {
      const result = await this.collection.updateMany(
        filter,
        { 
          $set: { 
            ...update, 
            updatedAt: new Date() 
          } 
        }
      );
      
      return result.modifiedCount;
    } catch (error) {
      console.error('批量更新文档失败:', error);
      throw error;
    }
  }

  // 删除文档
  async deleteById(id) {
    try {
      const result = await this.collection.deleteOne({ _id: new ObjectId(id) });
      return result.deletedCount > 0;
    } catch (error) {
      console.error('根据ID删除文档失败:', error);
      throw error;
    }
  }

  // 批量删除
  async deleteMany(filter) {
    try {
      const result = await this.collection.deleteMany(filter);
      return result.deletedCount;
    } catch (error) {
      console.error('批量删除文档失败:', error);
      throw error;
    }
  }

  // 统计文档数量
  async count(filter = {}) {
    try {
      return await this.collection.countDocuments(filter);
    } catch (error) {
      console.error('统计文档数量失败:', error);
      throw error;
    }
  }

  // 聚合查询
  async aggregate(pipeline) {
    try {
      return await this.collection.aggregate(pipeline).toArray();
    } catch (error) {
      console.error('聚合查询失败:', error);
      throw error;
    }
  }

  // 创建索引
  async createIndex(keys, options = {}) {
    try {
      return await this.collection.createIndex(keys, options);
    } catch (error) {
      console.error('创建索引失败:', error);
      throw error;
    }
  }

  // 获取索引信息
  async getIndexes() {
    try {
      return await this.collection.indexes();
    } catch (error) {
      console.error('获取索引信息失败:', error);
      throw error;
    }
  }
}

/**
 * 3. 用户模型示例
 */
class UserModel extends BaseModel {
  constructor(db) {
    super('users', db);
  }

  // 初始化索引
  async initIndexes() {
    await this.createIndex({ username: 1 }, { unique: true });
    await this.createIndex({ email: 1 }, { unique: true });
    await this.createIndex({ createdAt: -1 });
    console.log('用户模型索引创建完成');
  }

  // 根据用户名查找用户
  async findByUsername(username) {
    return await this.findOne({ username });
  }

  // 根据邮箱查找用户
  async findByEmail(email) {
    return await this.findOne({ email });
  }

  // 用户统计
  async getUserStats() {
    const pipeline = [
      {
        $group: {
          _id: null,
          totalUsers: { $sum: 1 },
          avgAge: { $avg: '$age' },
          minAge: { $min: '$age' },
          maxAge: { $max: '$age' }
        }
      }
    ];

    const result = await this.aggregate(pipeline);
    return result[0] || {};
  }

  // 按年龄分组统计
  async getUsersByAgeGroup() {
    const pipeline = [
      {
        $bucket: {
          groupBy: '$age',
          boundaries: [18, 25, 35, 45, 55, 100],
          default: 'Unknown',
          output: {
            count: { $sum: 1 },
            users: { $push: { username: '$username', age: '$age' } }
          }
        }
      }
    ];

    return await this.aggregate(pipeline);
  }

  // 活跃用户查询
  async getActiveUsers(days = 30) {
    const cutoffDate = new Date();
    cutoffDate.setDate(cutoffDate.getDate() - days);

    return await this.find({
      lastLoginAt: { $gte: cutoffDate }
    });
  }
}

/**
 * 4. MongoDB查询构建器
 */
class MongoQueryBuilder {
  constructor(collection) {
    this.collection = collection;
    this.reset();
  }

  reset() {
    this._filter = {};
    this._sort = {};
    this._limit = null;
    this._skip = null;
    this._projection = null;
    return this;
  }

  // 条件查询
  where(field, operator, value) {
    if (arguments.length === 2) {
      value = operator;
      operator = '$eq';
    }

    // 操作符映射
    const operatorMap = {
      '=': '$eq',
      '!=': '$ne',
      '>': '$gt',
      '>=': '$gte',
      '<': '$lt',
      '<=': '$lte',
      'in': '$in',
      'nin': '$nin'
    };

    const mongoOperator = operatorMap[operator] || operator;
    
    if (mongoOperator === '$eq') {
      this._filter[field] = value;
    } else {
      this._filter[field] = { [mongoOperator]: value };
    }

    return this;
  }

  // 范围查询
  whereBetween(field, min, max) {
    this._filter[field] = { $gte: min, $lte: max };
    return this;
  }

  // 正则查询
  whereRegex(field, pattern, options = 'i') {
    this._filter[field] = { $regex: pattern, $options: options };
    return this;
  }

  // 存在性查询
  whereExists(field, exists = true) {
    this._filter[field] = { $exists: exists };
    return this;
  }

  // 数组查询
  whereIn(field, values) {
    this._filter[field] = { $in: values };
    return this;
  }

  whereNotIn(field, values) {
    this._filter[field] = { $nin: values };
    return this;
  }

  // 逻辑查询
  whereOr(conditions) {
    this._filter.$or = conditions;
    return this;
  }

  whereAnd(conditions) {
    this._filter.$and = conditions;
    return this;
  }

  // 排序
  orderBy(field, direction = 1) {
    this._sort[field] = direction === 'desc' || direction === -1 ? -1 : 1;
    return this;
  }

  // 限制数量
  limit(count) {
    this._limit = count;
    return this;
  }

  // 跳过数量
  skip(count) {
    this._skip = count;
    return this;
  }

  // 字段选择
  select(fields) {
    if (typeof fields === 'string') {
      fields = fields.split(' ');
    }
    
    this._projection = {};
    fields.forEach(field => {
      this._projection[field] = 1;
    });
    
    return this;
  }

  // 执行查询
  async get() {
    let query = this.collection.find(this._filter);
    
    if (this._projection) {
      query = query.project(this._projection);
    }
    
    if (Object.keys(this._sort).length > 0) {
      query = query.sort(this._sort);
    }
    
    if (this._skip !== null) {
      query = query.skip(this._skip);
    }
    
    if (this._limit !== null) {
      query = query.limit(this._limit);
    }
    
    const result = await query.toArray();
    this.reset();
    return result;
  }

  // 获取第一条记录
  async first() {
    const results = await this.limit(1).get();
    return results[0] || null;
  }

  // 获取总数
  async count() {
    const count = await this.collection.countDocuments(this._filter);
    this.reset();
    return count;
  }

  // 构建聚合管道
  buildAggregatePipeline() {
    const pipeline = [];
    
    if (Object.keys(this._filter).length > 0) {
      pipeline.push({ $match: this._filter });
    }
    
    if (Object.keys(this._sort).length > 0) {
      pipeline.push({ $sort: this._sort });
    }
    
    if (this._skip !== null) {
      pipeline.push({ $skip: this._skip });
    }
    
    if (this._limit !== null) {
      pipeline.push({ $limit: this._limit });
    }
    
    if (this._projection) {
      pipeline.push({ $project: this._projection });
    }
    
    return pipeline;
  }
}

/**
 * 演示用法
 */
async function demonstrateMongoDBOperations() {
  console.log('=== Node.js MongoDB操作演示 ===\n');
  console.log('注意: 这是一个模拟演示,实际使用需要真实的MongoDB数据库\n');

  // 1. 连接管理演示
  console.log('1. MongoDB连接管理:');
  const dbManager = new MongoDBConnectionManager({
    uri: 'mongodb://localhost:27017',
    dbName: 'demo_app',
    maxPoolSize: 10
  });

  console.log('连接配置:', {
    uri: 'mongodb://localhost:27017',
    dbName: 'demo_app',
    maxPoolSize: 10
  });

  // 模拟连接事件
  dbManager.on('connected', () => {
    console.log('事件: 数据库连接成功');
  });

  dbManager.on('error', (error) => {
    console.log('事件: 连接错误 -', error.message);
  });

  console.log('连接状态: 模拟已连接\n');

  // 2. 基础CRUD操作演示
  console.log('2. 基础CRUD操作示例:');
  
  console.log('创建用户:');
  const newUser = {
    username: 'john_doe',
    email: 'john@example.com',
    age: 28,
    profile: {
      firstName: 'John',
      lastName: 'Doe',
      avatar: 'https://example.com/avatar.jpg'
    },
    tags: ['developer', 'nodejs', 'mongodb'],
    lastLoginAt: new Date()
  };
  console.log(JSON.stringify(newUser, null, 2));

  console.log('\n查询用户:');
  console.log('filter: { username: "john_doe" }');
  console.log('projection: { password: 0 }');

  console.log('\n更新用户:');
  console.log('filter: { _id: ObjectId("...") }');
  console.log('update: { $set: { lastLoginAt: new Date() } }');

  console.log('\n删除用户:');
  console.log('filter: { _id: ObjectId("...") }\n');

  // 3. 查询构建器演示
  console.log('3. 查询构建器示例:');
  const mockCollection = {
    find: (filter) => ({ 
      project: () => ({ 
        sort: () => ({ 
          skip: () => ({ 
            limit: () => ({ 
              toArray: async () => [] 
            }) 
          }) 
        }) 
      }) 
    }),
    countDocuments: async () => 0
  };

  const qb = new MongoQueryBuilder(mockCollection);
  
  console.log('复杂查询构建:');
  console.log(`
    qb.where('age', '>=', 18)
      .where('status', 'active')
      .whereIn('tags', ['developer', 'designer'])
      .orderBy('createdAt', -1)
      .limit(10)
      .select('username email profile')
      .get()
  `);

  console.log('构建的过滤条件:', {
    age: { $gte: 18 },
    status: 'active',
    tags: { $in: ['developer', 'designer'] }
  });

  // 4. 聚合查询演示
  console.log('\n4. 聚合查询示例:');
  console.log('用户年龄分组统计:');
  const ageGroupPipeline = [
    {
      $bucket: {
        groupBy: '$age',
        boundaries: [18, 25, 35, 45, 55, 100],
        default: 'Unknown',
        output: {
          count: { $sum: 1 },
          avgAge: { $avg: '$age' }
        }
      }
    }
  ];
  console.log(JSON.stringify(ageGroupPipeline, null, 2));

  console.log('\n标签使用统计:');
  const tagStatsPipeline = [
    { $unwind: '$tags' },
    { $group: { _id: '$tags', count: { $sum: 1 } } },
    { $sort: { count: -1 } },
    { $limit: 10 }
  ];
  console.log(JSON.stringify(tagStatsPipeline, null, 2));

  // 5. 索引管理演示
  console.log('\n5. 索引管理示例:');
  console.log('创建索引:');
  console.log('db.users.createIndex({ username: 1 }, { unique: true })');
  console.log('db.users.createIndex({ email: 1 }, { unique: true })');
  console.log('db.users.createIndex({ "profile.firstName": 1, "profile.lastName": 1 })');
  console.log('db.users.createIndex({ tags: 1 })');
  console.log('db.users.createIndex({ createdAt: -1 })');

  // 6. 事务演示(4.0+)
  console.log('\n6. 事务操作示例:');
  console.log(`
    const session = client.startSession();
    try {
      await session.withTransaction(async () => {
        await users.insertOne(userData, { session });
        await profiles.insertOne(profileData, { session });
        await logs.insertOne(logData, { session });
      });
    } finally {
      await session.endSession();
    }
  `);

  // 7. 性能优化建议
  console.log('\n7. MongoDB性能优化建议:');
  console.log('- 为常用查询字段创建索引');
  console.log('- 使用复合索引优化多字段查询');
  console.log('- 避免全表扫描,使用合适的过滤条件');
  console.log('- 使用聚合管道进行复杂数据分析');
  console.log('- 控制查询返回的字段数量');
  console.log('- 使用连接池管理数据库连接');
  console.log('- 定期监控慢查询日志');

  console.log('\n演示完成!');
}

// 如果直接运行此文件
if (require.main === module) {
  demonstrateMongoDBOperations().catch(console.error);
}

module.exports = {
  MongoDBConnectionManager,
  BaseModel,
  UserModel,
  MongoQueryBuilder,
  demonstrateMongoDBOperations
};

MongoDB vs 关系型数据库:

特性MongoDBMySQL
数据模型文档型关系型
Schema灵活固定
查询语言MQLSQL
事务支持4.0+完整支持
扩展性水平扩展垂直扩展
适用场景内容管理、实时分析事务系统、报表

Mongoose ODM:

const mongoose = require('mongoose');

const userSchema = new mongoose.Schema({
username: { type: String, required: true, unique: true },
email: { type: String, required: true, unique: true },
profile: {
firstName: String,
lastName: String,
avatar: String
},
createdAt: { type: Date, default: Date.now }
});

const User = mongoose.model('User', userSchema);

// 使用模型
const user = new User(userData);
await user.save();

性能优化:

  • 索引策略: 为常用查询字段创建索引
  • 查询优化: 使用explain()分析查询性能
  • 连接池: 合理配置连接池大小
  • 聚合优化: 在管道早期进行过滤操作

面试官视角

该题考察候选人对NoSQL数据库的理解:

  • 要点清单: 掌握MongoDB基本操作;理解文档模型;了解聚合查询;知道索引优化
  • 加分项: 有大数据处理经验;了解分片和复制;掌握性能调优;理解CAP理论
  • 常见失误: 不理解文档结构;滥用嵌套文档;忽视索引重要性;不了解聚合管道

延伸阅读

数据库连接池的作用和配置是什么?

答案

核心概念

数据库连接池是一种资源管理技术,预先创建和管理一定数量的数据库连接,供应用程序复用。连接池可以显著提升数据库操作性能,减少连接建立和销毁的开销。

连接池原理

1. 生命周期管理

应用启动 → 预创建连接 → 连接池就绪 → 分配连接 → 回收连接 → 应用关闭

2. 连接分配流程

  • 应用请求数据库操作
  • 从池中获取空闲连接
  • 执行数据库操作
  • 将连接归还到池中
  • 连接可被其他请求复用

3. 关键配置参数

const poolConfig = {
// 基础配置
connectionLimit: 10, // 最大连接数
acquireTimeout: 60000, // 获取连接超时时间
timeout: 60000, // SQL执行超时时间

// 连接管理
reconnect: true, // 自动重连
idleTimeout: 300000, // 空闲连接超时(5分钟)
maxLifetime: 1800000, // 连接最大生命周期(30分钟)

// 队列管理
queueLimit: 0, // 等待队列大小(0=无限制)

// 健康检查
testOnBorrow: true, // 获取连接时测试
validationQuery: 'SELECT 1' // 连接验证查询
};

优势和价值:

1. 性能提升

  • 减少连接开销: 避免频繁创建/销毁连接
  • 复用现有连接: 连接可被多个请求共享
  • 并发处理: 支持多个并发数据库操作
  • 资源优化: 合理利用系统资源

2. 稳定性保障

  • 连接限制: 防止数据库连接数过多
  • 故障恢复: 自动重建失效连接
  • 超时控制: 防止连接长时间占用
  • 监控统计: 实时监控连接池状态

配置最佳实践:

MySQL连接池配置:

const mysql = require('mysql2');

const pool = mysql.createPool({
host: process.env.DB_HOST,
port: process.env.DB_PORT || 3306,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME,

// 连接池大小 = CPU核心数 * 2 + 有效磁盘数
connectionLimit: 20,

// 超时设置
acquireTimeout: 60000,
timeout: 60000,

// 连接管理
reconnect: true,
multipleStatements: false, // 安全考虑

// SSL配置(生产环境)
ssl: process.env.NODE_ENV === 'production' ? {
rejectUnauthorized: false
} : false
});

MongoDB连接池配置:

const { MongoClient } = require('mongodb');

const client = new MongoClient(uri, {
// 连接池配置
maxPoolSize: 50, // 最大连接数
minPoolSize: 5, // 最小连接数
maxIdleTimeMS: 300000, // 连接空闲超时

// 服务器选择
serverSelectionTimeoutMS: 5000,
socketTimeoutMS: 45000,

// 重试配置
retryWrites: true,
retryReads: true,

// 压缩
compressors: ['zlib']
});

监控和调优:

1. 关键指标

// 获取连接池状态
const getPoolStats = () => ({
totalConnections: pool._allConnections.length,
activeConnections: pool._allConnections.length - pool._freeConnections.length,
freeConnections: pool._freeConnections.length,
queuedRequests: pool._connectionQueue.length
});

// 定期监控
setInterval(() => {
const stats = getPoolStats();
console.log('连接池状态:', stats);

// 告警逻辑
if (stats.queuedRequests > 10) {
console.warn('连接池队列积压严重');
}
}, 30000);

2. 性能调优建议

  • 连接数设置: 根据CPU核心数和并发量调整
  • 超时配置: 平衡响应时间和资源占用
  • 连接验证: 启用连接健康检查
  • 监控告警: 设置关键指标监控

常见问题和解决方案:

1. 连接泄漏

// ❌ 错误 - 连接未释放
const connection = await pool.getConnection();
const results = await connection.query(sql);
// 忘记释放连接

// ✅ 正确 - 确保连接释放
const connection = await pool.getConnection();
try {
const results = await connection.query(sql);
return results;
} finally {
connection.release(); // 确保释放
}

2. 连接超时

// 增加超时时间和重试机制
const queryWithRetry = async (sql, params, retries = 3) => {
for (let i = 0; i < retries; i++) {
try {
return await pool.execute(sql, params);
} catch (error) {
if (error.code === 'PROTOCOL_CONNECTION_LOST' && i < retries - 1) {
console.warn(`连接丢失,重试第${i + 1}`);
continue;
}
throw error;
}
}
};

面试官视角:

该题考察候选人对数据库连接管理的理解:

  • 要点清单: 理解连接池原理;掌握配置参数;了解性能优化;能监控和调优
  • 加分项: 有生产环境调优经验;了解不同数据库差异;能处理连接问题;有监控经验
  • 常见失误: 不理解连接池价值;配置不合理;忽视监控;不处理连接泄漏

延伸阅读: