MiniOB实现多表查询和聚合函数功能

实现能够支持多张表的笛卡尔积关联查询。请注意查询语句可能会带条件。查询结果展示的格式可以参考单表查询,每一列必须带有表信息,比如:

t1.id | t2.id
1 | 1

要求能够成功执行下面测试语句

create table t1(id int, sex int);
create table t2(id int, age int);
create table t3(id int, salary int);
select t1.*,t2.* from t1,t2;
select t1.id,t2.id from t1,t2;
select * from t1,t2;
select * from t1,t2 where t1.id=t2.id and t2.age > 10;
select * from t1,t2,t3;

实现该功能只需要修改sql/executor/execute_stage.cppdo_select函数的功能,当然还需要要实现一些别的函数来计算笛卡尔积和验证。

// 这里没有对输入的某些信息做合法性校验,比如查询的列名、where条件中的列名等,没有做必要的合法性校验
// 需要补充上这一部分.
// 校验部分也可以放在resolve,不过跟execution放一起也没有关系
RC ExecuteStage::do_select(const char *db, const Query *sql,
                           SessionEvent *session_event) {
  RC rc = RC::SUCCESS;
  Session *session = session_event->get_client()->session;
  Trx *trx = session->current_trx();
  const Selects &selects = sql->sstr.selection;

  // 把所有的表和只跟这张表关联的condition都拿出来,生成最底层的select
  // 执行节点
  std::vector<SelectExeNode *> select_nodes;
  for (size_t i = 0; i < selects.relation_num; i++) {
    const char *table_name = selects.relations[i];
    SelectExeNode *select_node = new SelectExeNode;
    rc = create_selection_executor(trx, selects, db, table_name, *select_node);
    if (rc != RC::SUCCESS) {
      delete select_node;
      for (SelectExeNode *&tmp_node: select_nodes) {
        delete tmp_node;
      }
      end_trx_if_need(session, trx, false);
      return rc;
    }
    select_nodes.push_back(select_node);
  }
  if (select_nodes.empty()) {
    LOG_ERROR("No table given");
    end_trx_if_need(session, trx, false);
    return RC::SQL_SYNTAX;
  }

  std::vector<TupleSet> tuple_sets;
  for (SelectExeNode *&node: select_nodes) {
    TupleSet tuple_set;
    rc = node->execute(tuple_set);
    if (rc != RC::SUCCESS) {
      for (SelectExeNode *&tmp_node: select_nodes) {
        delete tmp_node;
      }
      end_trx_if_need(session, trx, false);
      return rc;
    } else {
      tuple_sets.push_back(std::move(tuple_set));
    }
  }

  std::stringstream ss;
  TupleSet print_tuples;
  if (tuple_sets.size() > 1) {
    // 本次查询了多张表,需要做join操作
    TupleSchema join_schema;
    TupleSchema old_schema;
    for (std::vector<TupleSet>::const_reverse_iterator
                 rit = tuple_sets.rbegin(),
                 rend = tuple_sets.rend();
         rit != rend; ++rit) {
      // 这里是某张表投影完的所有字段,如果是select * from t1,t2;
      // old_schema=[t1.a, t1.b, t2.a, t2.b]
      old_schema.append(rit->get_schema());
    }

    std::vector<int> select_order;
    //TODO 根据列名输出顺序,添加 old_schema 对应字段到 join_schema 中,并构建select_order数组
    // 如果是select * ,添加所有字段
    // 如果是select t1.*,表名匹配的加入字段
    // 如果是select t1.age,表名+字段名匹配的加入字段
      print_tuples.set_schema(join_schema);

    // 构建联查的conditions需要找到对应的表
    // C x 3 数组
    // 每一条的3个元素代表(左值的属性在新schema的下标,CompOp运算符,右值的属性在新schema的下标)
    std::vector<std::vector<int>> condition_idxs;
    for (size_t i = 0; i < selects.condition_num; i++) {
      const Condition &condition = selects.conditions[i];
      if (condition.left_is_attr == 1 &&
          condition.right_is_attr == 1) {
        std::vector<int> temp_con;
        const char *l_table_name = condition.left_attr.relation_name;
        const char *l_field_name = condition.left_attr.attribute_name;
        const CompOp comp = condition.comp;
        const char *r_table_name = condition.right_attr.relation_name;
        const char *r_field_name = condition.right_attr.attribute_name;
        temp_con.push_back(print_tuples.get_schema().index_of_field(
                l_table_name, l_field_name));
        temp_con.push_back(comp);
        temp_con.push_back(print_tuples.get_schema().index_of_field(
                r_table_name, r_field_name));
        condition_idxs.push_back(temp_con);
      }
    }
    //TODO 元组的拼接需要实现笛卡尔积
    //TODO 将符合连接条件的元组添加到print_tables中

      print_tuples.print(ss);
    } else {
      // 当前只查询一张表,直接返回结果即可
      tuple_sets.front().print(ss);
    }
    for (SelectExeNode *&tmp_node: select_nodes) {
      delete tmp_node;
    }
    session_event->set_response(ss.str());
    end_trx_if_need(session, trx, true);
    return rc;
}

先看几个关键数据结构,解决一些关键问题:

TupleSet、Tuple、TupleValue

可以简单把这三个类和关系型数据里的表概念结合,TupleSet可理解成表,Tuple理解成表的一行(一个元组)、TupleValue表示某行某列上的值

  • TupleSet包含两个属性:tuples_schema_,schema记录了表的列信息、tuples记录了具体的数据

  • Tuple包含一个属性:values_,记录了一行中所有的值,每个值都是一个TupleValue

Query

SQL查询语句的语法解析部分已经写好,解析后的SQL信息被记录在Query *sql中,看一下Query类的声明

typedef struct Query {
  enum SqlCommandFlag flag;
  union Queries sstr;
} Query;

SqlCommandFlag标注当前SQL语句类型,此处一定为SCF_SELECT,即SELECT语句,Queries包含了具体SQL信息

union Queries {
  Selects selection;
  Inserts insertion;
  Deletes deletion;
  Updates update;
  CreateTable create_table;
  DropTable drop_table;
  CreateIndex create_index;
  DropIndex drop_index;
  DescTable desc_table;
  LoadData load_data;
  char *errors;
};

不同的SQL类型对应不同类型的信息,但存储方式为union,此处只关心Selects类型

typedef struct {
  size_t attr_num;                // Length of attrs in Select clause
  RelAttr attributes[MAX_NUM];    // attrs in Select clause
  size_t relation_num;            // Length of relations in Fro clause
  char *relations[MAX_NUM];       // relations in From clause
  size_t condition_num;           // Length of conditions in Where clause
  Condition conditions[MAX_NUM];  // conditions in Where clause
  size_t aggregation_num;
  Aggregation aggregations[MAX_NUM];
} Selects;

记录了attributes、relations、conditions、aggregations,很容易和sql语句对应上,attributes是SELECT的属性、relations是FROM中连接的表,conditions是WHERE后的条件,aggregations暂时还没用到,例如sql语句

SELECT t1.id, t2.age from t1, t2 where t1.id=t2.id and t2.age > 10;

以下都以该语句为例子,这里一个一个说:

  • attributes

    typedef struct {
      char *relation_name;   // relation name (may be NULL) 表名
      char *attribute_name;  // attribute name              属性名
    } RelAttr;
    

    image-20220411185856616

    需要说明的是,如果是SELECT *的情况,对应的relation_name和attribute_name都为NULL

    但如果是SELECT t1.*的情况,则relation_name=”t1”, attribute_name=”*”

    另外,很重要的一点是attributes的存储是逆序的,需要注意到这一点才能按顺序正确输出

  • relations

    直接用字符串记录表名

    image-20220411190147383

  • conditions

    typedef struct _Condition {
      int left_is_attr;  // TRUE if left-hand side is an attribute
      // 1时,操作符左边是属性名,0时,是属性值
      Value left_value;   // left-hand side value if left_is_attr = FALSE
      RelAttr left_attr;  // left-hand side attribute
      CompOp comp;        // comparison operator
      int right_is_attr;  // TRUE if right-hand side is an attribute
      // 1时,操作符右边是属性名,0时,是属性值
      RelAttr right_attr;  // right-hand side attribute if right_is_attr = TRUE
                           // 右边的属性
      Value right_value;   // right-hand side value if right_is_attr = FALSE
    } Condition;
    

    image-20220411190320349

    记录了左右值的表名和属性名,是否为属性以及操作类型,只有当值为非属性时读value值,否则读attr值。如t2.age>10right_is_attr=0,说明10非属性值,此时读right_valueleft_is_attr=1,此时读left_attr

Executor

这部分创建和调用执行器的函数已给出,实际上可以根据返回结果当成黑箱用,可以最后再看具体实现方式。

  • 现在已知sql语义信息被完整包含在Selects中,接下来要创建并调用执行器,对每张连接涉及的表(relations)执行一次,创建结果为一个SelectExeNode,包含以下属性:

    private:
        Trx *trx_ = nullptr;
        Table *table_;
        TupleSchema tuple_schema_;
        std::vector<DefaultConditionFilter *> condition_filters_;
    
    • trx_是一个事务相关的属性
    • table_记录了本次执行对应的table地址
    • tuple_schema_记录了被表里select到的属性schema(如果没有被select到就不包含)
    • condition_filters_记录了选择条件,但涉及多表的选择条件不会被记录在此处,需要在计算笛卡尔积后再进行检验
  • 执行结果为一个TupleSet,相当于一张新表,schema_只包含SELECT中被选择的属性,tuples_只包含满足选择conditions的数据

    private:
        std::vector<Tuple> tuples_;
        TupleSchema schema_;
    

    这里执行结果的schema顺序与sql语句中的查询顺序有关。

    实际上顺序是这样的:首先按表的顺序倒序扫,对于每张表内的schema,顺序与sql语句中的顺序相同。

    例如SELECT t1.id, t2.age, t1.sex, t2.id from t1, t2 where t1.id=t2.id and t2.age > 10;,对应的结果为:

    image-20220411203600197

现在可以拿到【对于每张表,剔除掉不符合要求的行列后生成的新表】,那么接下来的任务比较明确了:

  1. 构建出查询结果虚拟表的schema
  2. 生成表的笛卡尔积
  3. 判断笛卡尔积的结果是否满足多表联查条件
  4. 将满足条件的行添加入输出中

任务一:构建出查询结果虚拟表的schema

我们需要构建出查询结果虚拟表的schema,并且需要按照 selects.attribute 的顺序添加。并构建select_order数组记录顺序。

在实现中,我们首先把每个表的每个字段都无序拼接起来,用于后面的对应和查找,如果查询语句是 select * from t1,t2; 那么构建的 old_schema 应该是这种形式 [t1.a, t1.b, t2.a, t2.b],然后我们需要根据 select 选择的内容进行查找和匹配,主要有三种情况:*t1.*t1.age请根据注释要求进行匹配,把对应字段添加到 join_schema 中。

因为old_schema本身是由执行出来的tuple_sets拼接出来的,已经只包含SELECT要选的属性了,所以这里主要做的其实仅仅是对old_schema按照SELECT出来的顺序重新排序,并记录这个顺序。

// TODO 根据列名输出顺序,添加 old_schema 对应字段到 join_schema 中,并构建select_order数组
// 如果是select * ,添加所有字段
// 如果是select t1.*,表名匹配的加入字段
// 如果是select t1.age,表名+字段名匹配的加入字段
std::vector<int> orders;
for (int i = selects.attr_num - 1; i >= 0; i--) {
    for (int j = 0; j < old_schema.fields().size(); j++) {
        if (selects.attributes[i].relation_name == NULL
            || (strcmp(selects.attributes[i].relation_name, old_schema.field(j).table_name()) == 0
                && (strcmp(selects.attributes[i].attribute_name, old_schema.field(j).field_name()) == 0
                    || strcmp(selects.attributes[i].attribute_name, "*") == 0))) {
            join_schema.add(old_schema.field(j));
            orders.push_back(j);
        }
    }
}
print_tuples.set_schema(join_schema);

任务二:判断多表联查条件(execute_stage.cpp)

我们需要根据表的连接关系去除不符合逻辑的数据,在该函数中 res_tuple 是需要进行筛选的某一行,condition_idxs是 C x 3 数组,其每一条的3个元素代表(左值的属性在新schema的下标,CompOp运算符,右值的属性在新schema的下标),那么我们需要筛选表中某一行 res_tuple是否满足多表联查条件即:左值=右值。

这里只需要处理相等条件,即comp == EUQAL_TO,对于其他选择条件,在执行器中已被处理。

// 需要满足多表联查条件
bool match_join_condition(const Tuple *res_tuple,
                          const std::vector<std::vector<int>> condition_idxs) {
  // res_tuple 是 需要进行筛选的某一行
  // condition_idxs 是 C x 3 数组
  // 每一条的3个元素代表(左值的属性在新schema的下标,CompOp运算符,右值的属性在新schema的下标)
  //TODO 判断表中某一行 res_tuple 是否满足多表联查条件即:左值=右值

  for (int i = 0; i < condition_idxs.size(); i++) {
      CompOp comp = CompOp(condition_idxs[i][1]);
      const TupleValue &left_value = res_tuple->get(condition_idxs[i][0]);
      const TupleValue &right_value = res_tuple->get(condition_idxs[i][2]);

      if (comp == EQUAL_TO && left_value.compare(right_value)) {
        return false;
      }
  }
  return true;
}

任务三:多段小元组合成一个大元组(execute_stage.cpp)

这是一个辅助生成笛卡尔积的函数。

这里我们需要把多个元组按照SQL查询的顺序合并成一个元组,我们用orders数组来记录顺序,举例

temp_tuples=[[1,2,3],[4,5,6]] ,而order=[3,4,5,0,1,2] ,那么合并后结果 res_tuple=[4,5,6,1,2,3]

// 将多段小元组合成一个大元组
Tuple merge_tuples(
    const std::vector<std::vector<Tuple>::const_iterator> temp_tuples,
    std::vector<int> orders) {
  std::vector<std::shared_ptr<TupleValue>> temp_res;
  Tuple res_tuple;
  //TODO 先把每个字段都放到对应的位置上(temp_res)
  //TODO 再依次(orders)添加到大元组(res_tuple)里即可
  for (auto iter: temp_tuples) {
      for (auto value: iter->values()) {
          temp_res.push_back(value);
      }
  }
  for (int order: orders) {
      res_tuple.add(temp_res[order]);
  }
  return res_tuple;
}

任务四:实现笛卡尔积(execute_stage.cpp)

比较重点的是如何生成笛卡尔积。思路是先根据每张表的大小,确定最终结果的条数,再确定最终结果的每一条,对应了每张小表的哪一条,要保证最后的结果没有重复。,问题是如何确定第$i$条数据在A、B、C三张表里分别对应哪条数据。参考k进制的思路,区别在于每一位的进制都取决于对应表的大小。

例如A、B、C三个表分别由3、4、5条数据,显然最终有$3\times 4\times 5=60$条数据,用三位数分别表示三张表的行号,那么000开始,逐渐增加,000,001,002,…,010,…,100,…,最后为234。如果当前为第$i$条结果,计算第二张表对应的行就为$(i / 5) \% 4$,这个计算方法和求k进制某一位上的值方法几乎一样,只要注意计算不同位对应的不同进制。

//TODO 元组的拼接需要实现笛卡尔积
//TODO 将符合连接条件的元组添加到print_tables中
int tuple_number = 1;
std::vector<int> rec_tot(tuple_sets.size());
for (int i = tuple_sets.size() - 1; i >= 0; i--) {
    auto rit = &tuple_sets[i];
    rec_tot[i] = tuple_number;
    tuple_number *= rit->size();
}
for (int i = 0; i < tuple_number; i++) {
    std::vector<std::vector<Tuple>::const_iterator> vec;
    for (int j = tuple_sets.size() - 1; j >= 0; j--) {
        auto rit = &tuple_sets[j];
        vec.push_back(rit->tuples().begin() + (i / rec_tot[j]) % rit->size());
    }
    Tuple merge_res = merge_tuples(vec, orders);
    bool c = match_join_condition(&merge_res, condition_idxs);
    if (c)
        print_tuples.add(std::move(merge_res));

聚合函数

/* Copyright (c) 2021 Xie Meiyi(xiemeiyi@hust.edu.cn) and OceanBase and/or its
affiliates. All rights reserved. miniob is licensed under Mulan PSL v2. You can
use this software according to the terms and conditions of the Mulan PSL v2. You
may obtain a copy of Mulan PSL v2 at: http://license.coscl.org.cn/MulanPSL2 THIS
SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */

//
// Created by Longda on 2021/4/13.
//

#include "execute_stage.h"

#include <sstream>
#include <string>

#include "common/io/io.h"
#include "common/lang/string.h"
#include "common/log/log.h"
#include "common/seda/timer_stage.h"
#include "event/execution_plan_event.h"
#include "event/session_event.h"
#include "event/sql_event.h"
#include "event/storage_event.h"
#include "session/session.h"
#include "sql/executor/execution_node.h"
#include "sql/executor/tuple.h"
#include "storage/common/condition_filter.h"
#include "storage/common/table.h"
#include "storage/default/default_handler.h"
#include "storage/trx/trx.h"

using namespace common;

RC create_selection_executor(Trx *trx, const Selects &selects, const char *db,
                             const char *table_name,
                             SelectExeNode &select_node);

//! Constructor
ExecuteStage::ExecuteStage(const char *tag) : Stage(tag) {}

//! Destructor
ExecuteStage::~ExecuteStage() {}

//! Parse properties, instantiate a stage object
Stage *ExecuteStage::make_stage(const std::string &tag) {
  ExecuteStage *stage = new (std::nothrow) ExecuteStage(tag.c_str());
  if (stage == nullptr) {
    LOG_ERROR("new ExecuteStage failed");
    return nullptr;
  }
  stage->set_properties();
  return stage;
}

//! Set properties for this object set in stage specific properties
bool ExecuteStage::set_properties() {
  //  std::string stageNameStr(stageName);
  //  std::map<std::string, std::string> section = theGlobalProperties()->get(
  //    stageNameStr);
  //
  //  std::map<std::string, std::string>::iterator it;
  //
  //  std::string key;

  return true;
}

//! Initialize stage params and validate outputs
bool ExecuteStage::initialize() {
  LOG_TRACE("Enter");

  std::list<Stage *>::iterator stgp = next_stage_list_.begin();
  default_storage_stage_ = *(stgp++);
  mem_storage_stage_ = *(stgp++);

  LOG_TRACE("Exit");
  return true;
}

//! Cleanup after disconnection
void ExecuteStage::cleanup() {
  LOG_TRACE("Enter");

  LOG_TRACE("Exit");
}

void ExecuteStage::handle_event(StageEvent *event) {
  LOG_TRACE("Enter\n");

  handle_request(event);

  LOG_TRACE("Exit\n");
  return;
}

void ExecuteStage::callback_event(StageEvent *event, CallbackContext *context) {
  LOG_TRACE("Enter\n");

  // here finish read all data from disk or network, but do nothing here.
  ExecutionPlanEvent *exe_event = static_cast<ExecutionPlanEvent *>(event);
  SQLStageEvent *sql_event = exe_event->sql_event();
  sql_event->done_immediate();

  LOG_TRACE("Exit\n");
  return;
}

void ExecuteStage::handle_request(common::StageEvent *event) {
  ExecutionPlanEvent *exe_event = static_cast<ExecutionPlanEvent *>(event);
  SessionEvent *session_event = exe_event->sql_event()->session_event();
  Query *sql = exe_event->sqls();
  const char *current_db =
      session_event->get_client()->session->get_current_db().c_str();

  CompletionCallback *cb = new (std::nothrow) CompletionCallback(this, nullptr);
  if (cb == nullptr) {
    LOG_ERROR("Failed to new callback for ExecutionPlanEvent");
    exe_event->done_immediate();
    return;
  }
  exe_event->push_callback(cb);

  switch (sql->flag) {
    case SCF_SELECT: {  // select
      RC rc =
          do_select(current_db, sql, exe_event->sql_event()->session_event());
      if (rc != RC::SUCCESS) {
        session_event->set_response("FAILURE\n");
      }
      exe_event->done_immediate();
    } break;

    case SCF_INSERT:
    case SCF_UPDATE:
    case SCF_DELETE:
    case SCF_CREATE_TABLE:
    case SCF_SHOW_TABLES:
    case SCF_DESC_TABLE:
    case SCF_DROP_TABLE:
    case SCF_CREATE_INDEX:
    case SCF_DROP_INDEX:
    case SCF_LOAD_DATA: {
      StorageEvent *storage_event = new (std::nothrow) StorageEvent(exe_event);
      if (storage_event == nullptr) {
        LOG_ERROR("Failed to new StorageEvent");
        event->done_immediate();
        return;
      }

      default_storage_stage_->handle_event(storage_event);
    } break;
    case SCF_SYNC: {
      RC rc = DefaultHandler::get_default().sync();
      session_event->set_response(strrc(rc));
      exe_event->done_immediate();
    } break;
    case SCF_BEGIN: {
      session_event->get_client()->session->set_trx_multi_operation_mode(true);
      session_event->set_response(strrc(RC::SUCCESS));
      exe_event->done_immediate();
    } break;
    case SCF_COMMIT: {
      Trx *trx = session_event->get_client()->session->current_trx();
      RC rc = trx->commit();
      session_event->get_client()->session->set_trx_multi_operation_mode(false);
      session_event->set_response(strrc(rc));
      exe_event->done_immediate();
    } break;
    case SCF_ROLLBACK: {
      Trx *trx = session_event->get_client()->session->current_trx();
      RC rc = trx->rollback();
      session_event->get_client()->session->set_trx_multi_operation_mode(false);
      session_event->set_response(strrc(rc));
      exe_event->done_immediate();
    } break;
    case SCF_HELP: {
      const char *response =
          "show tables;\n"
          "desc `table name`;\n"
          "create table `table name` (`column name` `column type`, ...);\n"
          "create index `index name` on `table` (`column`);\n"
          "insert into `table` values(`value1`,`value2`);\n"
          "update `table` set column=value [where `column`=`value`];\n"
          "delete from `table` [where `column`=`value`];\n"
          "select [ * | `columns` ] from `table`;\n";
      session_event->set_response(response);
      exe_event->done_immediate();
    } break;
    case SCF_EXIT: {
      // do nothing
      const char *response = "Unsupported\n";
      session_event->set_response(response);
      exe_event->done_immediate();
    } break;
    default: {
      exe_event->done_immediate();
      LOG_ERROR("Unsupported command=%d\n", sql->flag);
    }
  }
}

void end_trx_if_need(Session *session, Trx *trx, bool all_right) {
  if (!session->is_trx_multi_operation_mode()) {
    if (all_right) {
      trx->commit();
    } else {
      trx->rollback();
    }
  }
}

std::string agg_to_string(Aggregation agg) {
  std::string res = "";
  //TODO 构造聚合函数名字
  switch (agg.func_name) {
    //TODO AGG_MAX
    //TODO AGG_MIN
    //TODO AGG_COUNT
    //TODO AGG_AVG
      case AGG_MAX:
          res += "MAX";
          break;
      case AGG_MIN:
          res += "MIN";
          break;
      case AGG_COUNT:
          res += "COUNT";
          break;
      case AGG_AVG:
          res += "AVG";
          break;
  }
  res += "(";
  if (1 == agg.is_value) {
    AttrType type = agg.value->type;
    void *val = agg.value->data;
    std::string str;
    //TODO 构造输出表达字符串
    switch (type) {
      //TODO INT
      //TODO FLOAT
      //TODO DATES
        case UNDEFINED:
            break;
        case CHARS:
            break;
        case INTS:
            res += std::to_string(*(int *)val);
            break;
        case DATES:
            res += "DATES";
            break;
        case FLOATS:
            res += std::to_string(*(float *)val);
            break;
    }
  }
  else{
    //TODO 如果有relation_name和field_name的话也要添加
    if (agg.attribute.relation_name != NULL) {
        res += agg.attribute.relation_name;
        res += ".";
    }
    res += agg.attribute.attribute_name;
  }
  res += ")";
  return res;
}

void aggregation_exec(const Selects &selects, TupleSet *res_tuples) {
  if (selects.aggregation_num > 0) {
    TupleSchema agg_schema;
    //TODO 设置schema
    //TODO 依次添加字段值
    for (size_t i = 0; i < selects.aggregation_num; i++) {
        const Aggregation &agg = selects.aggregations[i];
        int idx = -1;
        auto &temp = res_tuples->get_schema();
        auto &fields = res_tuples->get_schema().fields();
        for (int i = 0; i < res_tuples->schema().fields().size(); i++) {
            if (agg.attribute.attribute_name == NULL) continue; // 处理count(1)情况
            if ( (agg.attribute.relation_name == NULL || strcmp(agg.attribute.relation_name, fields[i].table_name()) == 0) &&
            (strcmp(agg.attribute.attribute_name, fields[i].field_name()) == 0) ) {
                idx = i;
                break;
            }
        }
        if (idx == -1) {
            agg_schema.add(INTS, "", agg_to_string(agg).c_str());
        } else {
            auto tupleField = res_tuples->get_schema().field(idx);
            agg_schema.add(tupleField.type(), tupleField.table_name(), agg_to_string(agg).c_str());
        }
    }
    Tuple out;
    for (size_t i = 0; i < selects.aggregation_num; i++) {
      const Aggregation &agg = selects.aggregations[i];
      const std::vector<Tuple> &tuples = res_tuples->tuples();
//      TupleValue tupleValue;
        int idx = -1;
        auto fields = res_tuples->get_schema().fields();
        for (int i = 0; i < res_tuples->schema().fields().size(); i++) {
            if (agg.attribute.attribute_name == NULL) continue; // 处理count(1)情况
            if ( (agg.attribute.relation_name == NULL || strcmp(agg.attribute.relation_name, fields[i].table_name()) == 0) &&
                 (strcmp(agg.attribute.attribute_name, fields[i].field_name()) == 0) ) {
                idx = i;
                break;
            }
        }
        AttrType attrType;
        if (idx != -1) {
            attrType = res_tuples->get_schema().field(idx).type();
        }
        switch (agg.func_name) {

        case FuncName::AGG_MAX: {
            switch (attrType) {
                case INTS: {
                    int mx = -999999999;
                    for (auto &tuple: tuples) {
                        int val = ((IntValue *) tuple.get_pointer(idx).get())->get_value();
                        mx = std::max(mx, val);
                    }
                    out.add(mx);
                    break;
                }
                case FLOATS: {
                    float mx = -999999999;
                    for (auto &tuple: tuples) {
                        float val = ((FloatValue *) tuple.get_pointer(idx).get())->get_value();
                        float mx = std::max(mx, val);
                    }
                    out.add(mx);
                    break;
                }
            }
            break;
        }
        case FuncName::AGG_MIN: {
          //TODO 遍历所有元组,获取最值

            switch (attrType) {
                case INTS: {
                    int mn = 999999999;
                    for (auto &tuple: tuples) {
                        int val = ((IntValue *) tuple.get_pointer(idx).get())->get_value();
                        mn = std::min(mn, val);
                    }
                    out.add(mn);
                    break;
                }
                case FLOATS: {
                    float mn = 999999999;
                    for (auto &tuple: tuples) {
                        float val = ((FloatValue *) tuple.get_pointer(idx).get())->get_value();
                        float mn = std::min(mn, val);
                    }
                    out.add(mn);
                    break;
                }
            }
            break;
        //TODO 增加这条记录
        }
        case FuncName::AGG_COUNT: {
          // 值为size的大小
          // count(1) or count(id)?
          if (idx == -1) {
              out.add((int)tuples.size());
          } else {
              switch (attrType) {
                  case INTS: {
                      int cnt = 0;
                      for (auto &tuple: tuples) {
                          int val = ((IntValue *) tuple.get_pointer(idx).get())->get_value();
                          if (val != NULL) cnt++;
                      }
                      out.add(cnt);
                      break;
                  }
                  case FLOATS: {
                      int cnt = 0;
                      for (auto &tuple: tuples) {
                          float val = ((FloatValue *) tuple.get_pointer(idx).get())->get_value();
                          if (val != NULL) cnt++;
                      }
                      out.add(cnt);
                      break;
                  }
              }

          }
          //TODO 增加这条记录
          break;
        }
        case FuncName::AGG_AVG: {
            //TODO 遍历所有元组,获取和
            float sum = 0;
            switch (attrType) {
                case INTS: {
                    for (auto &tuple: tuples) {
                        int val = ((IntValue *) tuple.get_pointer(idx).get())->get_value();
                        sum += val;
                    }
                    break;
                }
                case FLOATS: {
                    for (auto &tuple: tuples) {
                        float val = ((FloatValue *) tuple.get_pointer(idx).get())->get_value();
                        sum += val;
                    }
                    break;
                }
            }
            //TODO 增加这条记录
            out.add(sum / tuples.size());
            break;
        }
      }
    }
    //等所有值都计算完再去清除res
    res_tuples->clear();
    res_tuples->set_schema(agg_schema);
    res_tuples->add(std::move(out));
  }
  return;
}

// 需要满足多表联查条件
bool match_join_condition(const Tuple *res_tuple,
                          const std::vector<std::vector<int>> condition_idxs) {
    // res_tuple 是 需要进行筛选的某一行
    // condition_idxs 是 C x 3 数组
    // 每一条的3个元素代表(左值的属性在新schema的下标,CompOp运算符,右值的属性在新schema的下标)
    //TODO 判断表中某一行 res_tuple 是否满足多表联查条件即:左值=右值
    for (int i = 0; i < condition_idxs.size(); i++) {
        CompOp comp = CompOp(condition_idxs[i][1]);
        const TupleValue &left_value = res_tuple->get(condition_idxs[i][0]);
        const TupleValue &right_value = res_tuple->get(condition_idxs[i][2]);
        if (comp == EQUAL_TO && left_value.compare(right_value)) {
            return false;
        }
    }
    return true;
}
// 将多段小元组合成一个大元组
Tuple merge_tuples(
        const std::vector<std::vector<Tuple>::const_iterator> temp_tuples,
        std::vector<int> orders) {
    std::vector<std::shared_ptr<TupleValue>> temp_res;
    Tuple res_tuple;
    //TODO 先把每个字段都放到对应的位置上(temp_res)
    //TODO 再依次(orders)添加到大元组(res_tuple)里即可
    for (auto iter: temp_tuples) {
        for (auto value: iter->values()) {
            temp_res.push_back(value);
        }
    }
    for (int order: orders) {
        res_tuple.add(temp_res[order]);
    }
    return res_tuple;
}

// 这里没有对输入的某些信息做合法性校验,比如查询的列名、where条件中的列名等,没有做必要的合法性校验
// 需要补充上这一部分.
// 校验部分也可以放在resolve,不过跟execution放一起也没有关系
RC ExecuteStage::do_select(const char *db, const Query *sql,
                           SessionEvent *session_event) {
  RC rc = RC::SUCCESS;
  Session *session = session_event->get_client()->session;
  Trx *trx = session->current_trx();
  const Selects &selects = sql->sstr.selection;

  // 把所有的表和只跟这张表关联的condition都拿出来,生成最底层的select
  // 执行节点
  std::vector<SelectExeNode *> select_nodes;
  for (size_t i = 0; i < selects.relation_num; i++) {
    const char *table_name = selects.relations[i];
    SelectExeNode *select_node = new SelectExeNode;
    rc = create_selection_executor(trx, selects, db, table_name, *select_node);
    if (rc != RC::SUCCESS) {
      delete select_node;
      for (SelectExeNode *&tmp_node: select_nodes) {
        delete tmp_node;
      }
      end_trx_if_need(session, trx, false);
      return rc;
    }
    select_nodes.push_back(select_node);
  }
  if (select_nodes.empty()) {
    LOG_ERROR("No table given");
    end_trx_if_need(session, trx, false);
    return RC::SQL_SYNTAX;
  }

  std::vector<TupleSet> tuple_sets;
  for (SelectExeNode *&node: select_nodes) {
    TupleSet tuple_set;
    rc = node->execute(tuple_set);
    if (rc != RC::SUCCESS) {
      for (SelectExeNode *&tmp_node: select_nodes) {
        delete tmp_node;
      }
      end_trx_if_need(session, trx, false);
      return rc;
    } else {
      tuple_sets.push_back(std::move(tuple_set));
    }
  }

  std::stringstream ss;
  TupleSet print_tuples;
  if (tuple_sets.size() > 1) {
    // 本次查询了多张表,需要做join操作
    TupleSchema join_schema;
    TupleSchema old_schema;
    for (std::vector<TupleSet>::const_reverse_iterator
                 rit = tuple_sets.rbegin(),
                 rend = tuple_sets.rend();
         rit != rend; ++rit) {
      // 这里是某张表投影完的所有字段,如果是select * from t1,t2;
      // old_schema=[t1.a, t1.b, t2.a, t2.b]
      old_schema.append(rit->get_schema());
    }

    std::vector<int> select_order;
    //TODO 根据列名输出顺序,添加 old_schema 对应字段到 join_schema 中,并构建select_order数组
    // 如果是select * ,添加所有字段
    // 如果是select t1.*,表名匹配的加入字段
    // 如果是select t1.age,表名+字段名匹配的加入字段

    std::vector<int> orders;
    for (int i = selects.attr_num - 1; i >= 0; i--) {
        for (int j = 0; j < old_schema.fields().size(); j++) {
            if (selects.attributes[i].relation_name == NULL
                || (strcmp(selects.attributes[i].relation_name, old_schema.field(j).table_name()) == 0
                    && (strcmp(selects.attributes[i].attribute_name, old_schema.field(j).field_name()) == 0
                        || strcmp(selects.attributes[i].attribute_name, "*") == 0))) {
                join_schema.add(old_schema.field(j));
                orders.push_back(j);
            }
        }
    }

    print_tuples.set_schema(join_schema);

    // 构建联查的conditions需要找到对应的表
    // C x 3 数组
    // 每一条的3个元素代表(左值的属性在新schema的下标,CompOp运算符,右值的属性在新schema的下标)
    std::vector<std::vector<int>> condition_idxs;
    for (size_t i = 0; i < selects.condition_num; i++) {
      const Condition &condition = selects.conditions[i];
      if (condition.left_is_attr == 1 &&
          condition.right_is_attr == 1) {
        std::vector<int> temp_con;
        const char *l_table_name = condition.left_attr.relation_name;
        const char *l_field_name = condition.left_attr.attribute_name;
        const CompOp comp = condition.comp;
        const char *r_table_name = condition.right_attr.relation_name;
        const char *r_field_name = condition.right_attr.attribute_name;
        temp_con.push_back(print_tuples.get_schema().index_of_field(
                l_table_name, l_field_name));
        temp_con.push_back(comp);
        temp_con.push_back(print_tuples.get_schema().index_of_field(
                r_table_name, r_field_name));
        condition_idxs.push_back(temp_con);
      }
    }
    //TODO 元组的拼接需要实现笛卡尔积
    //TODO 将符合连接条件的元组添加到print_tables中
    int tuple_number = 1;
    std::vector<int> rec_tot(tuple_sets.size());
    for (int i = tuple_sets.size() - 1; i >= 0; i--) {
        auto rit = &tuple_sets[i];
        rec_tot[i] = tuple_number;
        tuple_number *= rit->size();
    }
    for (int i = 0; i < tuple_number; i++) {
        std::vector<std::vector<Tuple>::const_iterator> vec;
        for (int j = tuple_sets.size() - 1; j >= 0; j--) {
            auto rit = &tuple_sets[j];
            vec.push_back(rit->tuples().begin() + (i / rec_tot[j]) % rit->size());
        }
        Tuple merge_res = merge_tuples(vec, orders);
        bool c = match_join_condition(&merge_res, condition_idxs);
        if (c)
            print_tuples.add(std::move(merge_res));
    //TODO 添加聚合算子
      aggregation_exec(selects, &print_tuples);
      print_tuples.print(ss);
    }
  } else {
    //TODO 添加聚合算子
      aggregation_exec(selects, &tuple_sets.front());
    // 当前只查询一张表,直接返回结果即可
      tuple_sets.front().print(ss);
    }
    for (SelectExeNode *&tmp_node: select_nodes) {
      delete tmp_node;
    }
    session_event->set_response(ss.str());
    end_trx_if_need(session, trx, true);
    return rc;
}


bool match_table(const Selects &selects, const char *table_name_in_condition,
                 const char *table_name_to_match) {
  if (table_name_in_condition != nullptr) {
    return 0 == strcmp(table_name_in_condition, table_name_to_match);
  }

  return selects.relation_num == 1;
}

static RC schema_add_field(Table *table, const char *field_name,
                           TupleSchema &schema) {
  const FieldMeta *field_meta = table->table_meta().field(field_name);
  if (nullptr == field_meta) {
    LOG_WARN("No such field. %s.%s", table->name(), field_name);
    return RC::SCHEMA_FIELD_MISSING;
  }

  schema.add_if_not_exists(field_meta->type(), table->name(),
                           field_meta->name());
  return RC::SUCCESS;
}

// 把所有的表和只跟这张表关联的condition都拿出来,生成最底层的select 执行节点
RC create_selection_executor(Trx *trx, const Selects &selects, const char *db,
                             const char *table_name,
                             SelectExeNode &select_node) {
  Table *table;

  // attribute tables check
  for (size_t i = 0; i < selects.attr_num; i++) {
    if (selects.attributes[i].relation_name == nullptr) {
      continue;
    }
    table = DefaultHandler::get_default().find_table(
        db, selects.attributes[i].relation_name);
    if (nullptr == table) {
      LOG_WARN("No such table [%s] in db [%s]",
               selects.attributes[i].relation_name, db);
      return RC::SCHEMA_TABLE_NOT_EXIST;
    }
  }

  // condition tables check
  for (size_t i = 0; i < selects.condition_num; i++) {
    if (selects.conditions[i].left_is_attr == 1) {
      if (selects.conditions[i].left_attr.relation_name == nullptr) {
        continue;
      }
      table = DefaultHandler::get_default().find_table(
          db, selects.conditions[i].left_attr.relation_name);
      if (nullptr == table) {
        LOG_WARN("No such table [%s] in db [%s]",
                 selects.conditions[i].left_attr.relation_name, db);
        return RC::SCHEMA_TABLE_NOT_EXIST;
      }
    }
    if (selects.conditions[i].left_is_attr == 1) {
      if (selects.conditions[i].right_attr.relation_name == nullptr) {
        continue;
      }
      table = DefaultHandler::get_default().find_table(
          db, selects.conditions[i].right_attr.relation_name);
      if (nullptr == table) {
        LOG_WARN("No such table [%s] in db [%s]",
                 selects.conditions[i].right_attr.relation_name, db);
        return RC::SCHEMA_TABLE_NOT_EXIST;
      }
    }
  }

  // 列出跟这张表关联的Attr
  TupleSchema schema;
  table = DefaultHandler::get_default().find_table(db, table_name);
  if (nullptr == table) {
    LOG_WARN("No such table [%s] in db [%s]", table_name, db);
    return RC::SCHEMA_TABLE_NOT_EXIST;
  }

  //如果是聚合函数:count/min/max/avg(PARAMETER),直接select PARAMETER
  if (selects.aggregation_num > 0 && selects.attr_num == 0) {
    for (int i = selects.aggregation_num - 1; i >= 0; i--) {
      if (1 == selects.aggregations[i].is_value) {
        // 列出这张表所有字段
        TupleSchema::from_table(table, schema);
        break;  // 没有校验,给出* 之后,再写字段的错误
      }
      const RelAttr &attr = selects.aggregations[i].attribute;
      if (nullptr == attr.relation_name ||
          0 == strcmp(table_name, attr.relation_name)) {
        if (0 == strcmp("*", attr.attribute_name)) {
          // 列出这张表所有字段
          TupleSchema::from_table(table, schema);
          break;
        } else {
          // 列出这张表相关字段
          RC rc = schema_add_field(table, attr.attribute_name, schema);
          if (rc != RC::SUCCESS) {
            return rc;
          }
        }
      }
    }
  } else {  // 正常的投影操作
    for (int i = selects.attr_num - 1; i >= 0; i--) {
      const RelAttr &attr = selects.attributes[i];
      if (nullptr == attr.relation_name ||
          0 == strcmp(table_name, attr.relation_name)) {
        if (0 == strcmp("*", attr.attribute_name)) {
          // 列出这张表所有字段
          TupleSchema::from_table(table, schema);
          break;  // 没有校验,给出* 之后,再写字段的错误
        } else {
          // 列出这张表相关字段
          RC rc = schema_add_field(table, attr.attribute_name, schema);
          if (rc != RC::SUCCESS) {
            return rc;
          }
        }
      }
    }
  }

  // 找出仅与此表相关的过滤条件, 或者都是值的过滤条件
  std::vector<DefaultConditionFilter *> condition_filters;
  for (size_t i = 0; i < selects.condition_num; i++) {
    const Condition &condition = selects.conditions[i];
    if ((condition.left_is_attr == 0 &&
         condition.right_is_attr == 0) ||  // 两边都是值
        (condition.left_is_attr == 1 && condition.right_is_attr == 0 &&
         match_table(selects, condition.left_attr.relation_name,
                     table_name)) ||  // 左边是属性右边是值
        (condition.left_is_attr == 0 && condition.right_is_attr == 1 &&
         match_table(selects, condition.right_attr.relation_name,
                     table_name)) ||  // 左边是值,右边是属性名
        (condition.left_is_attr == 1 && condition.right_is_attr == 1 &&
         match_table(selects, condition.left_attr.relation_name, table_name) &&
         match_table(selects, condition.right_attr.relation_name,
                     table_name))  // 左右都是属性名,并且表名都符合
    ) {
      DefaultConditionFilter *condition_filter = new DefaultConditionFilter();
      RC rc = condition_filter->init(*table, condition);
      if (rc != RC::SUCCESS) {
        delete condition_filter;
        for (DefaultConditionFilter *&filter : condition_filters) {
          delete filter;
        }
        return rc;
      }
      condition_filters.push_back(condition_filter);
    }
  }

  return select_node.init(trx, table, std::move(schema),
                          std::move(condition_filters));
}