创建Connector时,利用反射创建ProtocolHandler,NIO是Http11NioProtocol。

Connector初始化后,调用start方法开启。 Connector的startInternal()方法,会调用protocolHandler.start(); protocolHandler中会调用endpoint.start(),从而达到开启endpoint、监听端口、读写Socket的目的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Http11NioProtocol extends AbstractHttp11JsseProtocol<NioChannel> {
}

public abstract class AbstractHttp11JsseProtocol<S>
        extends AbstractHttp11Protocol<S> {
}

public abstract class AbstractHttp11Protocol<S> extends AbstractProtocol<S> {
}

public abstract class AbstractProtocol<S> implements ProtocolHandler,
        MBeanRegistration {
    @Override
    public void start() throws Exception {
        if (getLog().isInfoEnabled())
            getLog().info(sm.getString("abstractProtocolHandler.start",
                    getName()));
        try {
            endpoint.start();
        } catch (Exception ex) {
            getLog().error(sm.getString("abstractProtocolHandler.startError",
                    getName()), ex);
            throw ex;
        }
    }
}

NIOEndpoint包括几个部分:

  • LimitLatch是连接控制器,AbstractEndpoint使用它负责维护连接数的计算,nio模式下默认是10000,达到这个阈值后,就会拒绝连接请求。
  • Acceptor负责接收连接,默认是1个线程来执行,将请求的事件PollerEvent注册到事件列表events
  • Poller来负责轮询上述产生的事件。Poller线程数量是cpu的核数Math.min(2,Runtime.getRuntime().availableProcessors())。Poller.run()不断循环,调用events()方法,将事件注册到Selector。Poller.run()调用selector.select()将就绪的事件生成SocketProcessor,然后交给Excutor去执行。
  • SocketProcessor继承了SocketProcessorBase,实现了Runnable接口,可以提交给线程池Excutor来执行。它里面的doRun()方法,封装了读写Socket、完成Container调用的逻辑
  • Excutor线程池是一个Tomcat线程池。用来执行Poller创建的SocketProcessor。Excutor线程池的大小就是我们在Connector节点配置的maxThreads的值。
Read on →

一个典型的SpringBoot应用入口

1
2
3
4
5
6
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

@SpringBootApplication注解

1
2
3
4
5
6
7
8
9
10
11
12
13
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@SpringBootConfiguration
@EnableAutoConfiguration
@ComponentScan(excludeFilters = {
    @Filter(type = FilterType.CUSTOM, classes = TypeExcludeFilter.class),
    @Filter(type = FilterType.CUSTOM,
        classes = AutoConfigurationExcludeFilter.class) })
@ConfigurationPropertiesScan
public @interface SpringBootApplication {
}

主要包括@SpringBootConfiguration, @EnableAutoConfiguration, @ComponentScan和@ConfigurationPropertiesScan。要注解发挥作用要先启动。

SpringBoot 启动过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class SpringApplication {
  public static final String DEFAULT_SERVLET_WEB_CONTEXT_CLASS = "org.springframework.boot."
  private List<ApplicationContextInitializer<?>> initializers;
  private List<ApplicationListener<?>> listeners;
  private Set<Class<?>> primarySources;

  public SpringApplication(ResourceLoader resourceLoader, Class<?>... primarySources) {
    this.resourceLoader = resourceLoader;
    Assert.notNull(primarySources, "PrimarySources must not be null");
    this.primarySources = new LinkedHashSet<>(Arrays.asList(primarySources));
    this.webApplicationType = WebApplicationType.deduceFromClasspath();
    setInitializers((Collection) getSpringFactoriesInstances(
        ApplicationContextInitializer.class));
    setListeners((Collection) getSpringFactoriesInstances(ApplicationListener.class));
    this.mainApplicationClass = deduceMainApplicationClass();
  }

  public static ConfigurableApplicationContext run(Class<?> primarySource,
      String... args) {
    return run(new Class<?>[] { primarySource }, args);
  }
}

用户的Application.class会作为primarySource,存到primarySources数组. 后面prepareContext的getAllSources和load会用到。

Read on →

一个简单的http server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
public class HttpServer {
  private boolean shutdown = false;

  public static void main(String[] args) {
    HttpServer server = new HttpServer();
    server.await();
  }

  public void await() {
    ServerSocket serverSocket = null;
    int port = 8080;
    serverSocket =  new ServerSocket(port, 1, InetAddress.getByName("127.0.0.1"));

    while (!shutdown) {
      Socket socket = null;
      InputStream input = null;
      OutputStream output = null;
      socket = serverSocket.accept();
      input = socket.getInputStream();
      output = socket.getOutputStream();

      Request request = new Request(input);
      request.parse();

      Response response = new Response(output);
      response.setRequest(request);

      if (request.getUri().startsWith("/servlet/")) {
        ServletProcessor1 processor = new ServletProcessor1();
        processor.process(request, response);
      } else {
        StaticResourceProcessor processor = new StaticResourceProcessor();
        processor.process(request, response);
      }

      socket.close();

      shutdown = request.getUri().equals(SHUTDOWN_COMMAND);
    }
  }
}

public class Request {
  public void parse() {
    StringBuffer request = new StringBuffer(2048);
    int i;
    byte[] buffer = new byte[2048];
    i = input.read(buffer);
    for (int j=0; j<i; j++) {
      request.append((char) buffer[j]);
    }
    uri = parseUri(request.toString());
  }

  private String parseUri(String requestString) {
    // intput: GET /test HTTP/1.1
    // output: "/test"
  }
}

public class Response {
  public void sendStaticResource() throws IOException {
    byte[] bytes = new byte[BUFFER_SIZE];
    FileInputStream fis = null;
    File file = new File(HttpServer.WEB_ROOT, request.getUri());
    if (file.exists()) {
      fis = new FileInputStream(file);
      int ch = fis.read(bytes, 0, BUFFER_SIZE);
      while (ch!=-1) {
        output.write(bytes, 0, ch);
        ch = fis.read(bytes, 0, BUFFER_SIZE);
      }
    } else {
      String errorMessage = "HTTP/1.1 404 File Not Found\r\n" +
        "Content-Type: text/html\r\n" +
        "Content-Length: 23\r\n" +
        "\r\n" +
        "<h1>File Not Found</h1>";
      output.write(errorMessage.getBytes());
    }
  }
}

public class StaticResourceProcessor {
  public void process(Request request, Response response) {
    response.sendStaticResource();
  }
}

public class ServletProcessor1 {
  public void process(Request request, Response response) {
    String uri = request.getUri();
    String servletName = uri.substring(uri.lastIndexOf("/") + 1);
    URLClassLoader loader = null;

    // Initialize Class Loader, repository is the folder to find servlet classes.
    URL[] urls = new URL[1];
    URLStreamHandler streamHandler = null;
    File classPath = new File(Constants.WEB_ROOT);
    String repository = (new URL("file", null, classPath.getCanonicalPath() + File.separator)).toString();
    urls[0] = new URL(null, repository, streamHandler);
    loader = new URLClassLoader(urls);

    // Load class
    Class myClass = null;
    myClass = loader.loadClass(servletName);

    Servlet servlet = null;
    servlet = (Servlet) myClass.newInstance();
    servlet.service((ServletRequest) request, (ServletResponse) response);
  }
}
Read on →

1
2
3
4
5
6
7
8
9
public class DispatcherServlet extends FrameworkServlet {
  private static final String DEFAULT_STRATEGIES_PATH = "DispatcherServlet.properties";

  static {
    // Load default strategy implementations from properties file.
    ClassPathResource resource = new ClassPathResource(DEFAULT_STRATEGIES_PATH, DispatcherServlet.class);
    defaultStrategies = PropertiesLoaderUtils.loadProperties(resource);
  }
}

resources/org/springframework/web/servlet/DispatcherServlet.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
org.springframework.web.servlet.LocaleResolver=org.springframework.web.servlet.i18n.AcceptHeaderLocaleResolver

org.springframework.web.servlet.ThemeResolver=org.springframework.web.servlet.theme.FixedThemeResolver

org.springframework.web.servlet.HandlerMapping=org.springframework.web.servlet.handler.BeanNameUrlHandlerMapping,</span>
  org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerMapping,</span>
  org.springframework.web.servlet.function.support.RouterFunctionMapping

org.springframework.web.servlet.HandlerAdapter=org.springframework.web.servlet.mvc.HttpRequestHandlerAdapter,</span>
  org.springframework.web.servlet.mvc.SimpleControllerHandlerAdapter,</span>
  org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter,</span>
  org.springframework.web.servlet.function.support.HandlerFunctionAdapter


org.springframework.web.servlet.HandlerExceptionResolver=org.springframework.web.servlet.mvc.method.annotation.ExceptionHandlerExceptionResolver,</span>
  org.springframework.web.servlet.mvc.annotation.ResponseStatusExceptionResolver,</span>
  org.springframework.web.servlet.mvc.support.DefaultHandlerExceptionResolver

org.springframework.web.servlet.RequestToViewNameTranslator=org.springframework.web.servlet.view.DefaultRequestToViewNameTranslator

org.springframework.web.servlet.ViewResolver=org.springframework.web.servlet.view.InternalResourceViewResolver

org.springframework.web.servlet.FlashMapManager=org.springframework.web.servlet.support.SessionFlashMapManager

在context成功的refresh过后,onRefresh方法就会被调用,然后它会调用initStrategies 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
  protected void onRefresh(ApplicationContext context) {
    initStrategies(context);
  }

  // Initialize the strategy objects that this servlet uses.
  protected void initStrategies(ApplicationContext context) {
    initMultipartResolver(context);
    initLocaleResolver(context);
    initThemeResolver(context);
    initHandlerMappings(context);
    initHandlerAdapters(context);
    initHandlerExceptionResolvers(context);
    initRequestToViewNameTranslator(context);
    initViewResolvers(context);
    initFlashMapManager(context);
  }

Read on →

第二章 简单动态字符串

SDS还被用作AOF缓冲区

1
2
3
4
5
6
7
8
9
10
struct sdshdr {
  //buf数组中已使用字节的数量
  int len;

  // buf数组中未使用字节的数量
  int free;

  // 保存字符串,字符串的最后一个字节保存空字符'\0'
  char buf[];
}

2.2 SDS与C字符串区别

2.2.1 O(1)获取字符串长度

2.2.2 杜绝缓冲区溢出

1
char *strcat(char *dest, cont char *src)

C字符串不记录自身的长度,strcat函数假定dest已有足够多的内存。

当SDS API对SDS进行修改时,API会先检查SDS的空间是否满足修改所需。

减少修改字符串时带来的内存重分配

C字符串每次增长或者缩短,程序总要对保存这个C字符串的数组进行一次内存重分配。 但Redis作为数据库,数据被频繁修改。

SDS空间预分配

当SDS的API对一个SDS进行修改,且需要对SDS进行空间扩展。程序不仅为SDS分配所必要的空间,还会分配额外空间。 * 修改后SDS的len小于1MB,那么程序分配和len属性同样大小的未使用空间。 * 修改后SDS的len大于1MB,则额外分配1MB。

SDS惰性空间释放

当API需要缩短字符串,程序不会立即进行内存重分配。而使用free属性记录,以待将来使用。

2.2.4 二进制安全

buf属性为字节数组,保存一系列二进制数据。

2.2.5 兼容部分C字符串函数

sds->buf

Read on →

第一章 初识Kafka

broker,发布消息的中间点

1.2

消息和批次

Kafka的数据单元被称为消息。为键生成一个一致性散列值,而后用散列值对主题分区数进行取模,为消息选取分区。这样保证相同键的消息总是被写到相同的分区。

为了提高消息,消息被分批次写入Kafka。批次是一组消息,这些消息同属同一个主题和分区。要在时间延迟和吞吐量之间作出权衡。批次数据会被压缩。

模式

JSON、XML易用、可读性好,但缺乏强类型的处理能力。不同版本间的兼容性也不好。

主题和分区

主题好比数据库的表,主题可以分为若干个分区,一个分区就是一个提交日志。消息以追加的方式写入分区,以先进先出的顺序读取。 一个主题可以包含多个分区,无法在整个主题范围保证消息的顺序,但消息在单个分区是有序。 Kafka通过分区实现数据冗余和伸缩性。

生产者和消费者

生产者在默认情况下把消息均衡地分布到主题的所有分区上。生产者也可以自定义分区器,将消息映射到指定分区。

消费者订阅一个或多个主题,按消息生成的顺序读取。检查消息的偏移量来区分已经读过的消息。 偏移量是一种元数据,不断递增的整数,在创建消息时,Kafka会把它添加到消息里。

消费者群组保证每个分区只能被一个消息者使用。如果一个消费者失效,群组里的其他消费者可以接管。

broker和集群

一个独立的Kafka称为broker。broker接收生产者消息,为消息设置偏移量,提交至硬盘。broker为消费者提供服务,响应读取,返回磁盘上的消息。

每个集群都有一个broker作为集群控制器。负责管理,把分区分配给broker,监控broker。 在集群中,一个分区属于一个broker,该broker是分区首领。把一个分区分配给多个broker,则会发生分区复制。

消息保留。一段时间(7天)或达到一定的字节数(1GB) 可以配置紧凑型日志,只有最后一个带有特定键的消息会被保留下来。

多集群好处

  • 数据类型分离
  • 安全需要
  • 多数据中心,灾难恢复。

为什么选择Kafka

  • 推送和拉取模型。
  • 多个生产者
  • 多个消费者。 与其他队列系统不同,其他队列系统的消息一旦被一个客户端读取,其他客户端就无法再读取。
  • 基于磁盘的数据储存。
  • 伸缩性,ActiveMQ无法满足横向扩展。
  • 高性能

1.4 数据生态系统

Kafka为客户端提供一致的接口。生产者和消费者之间不再有紧密的耦合。

Read on →

第二章 java内存区域与内存溢出异常

2.2 运行时数据区域

程序计数器(Program Counter Register):线程私有。是一块较小的内存空间,是当前线程所执行的字节码的行号指示器。 Java虚拟机的多线程是通过线程轮流切换并分配处理器执行时间的方式来实现的。每条线程都需要有一个独立的程序计数器。

Java虚拟机栈(VM Stack):线程私有。每个方法在执行都会创建一个栈帧(Stack Frame)用于储存局部变量表、操作数栈、动态链接、方法出口等。 局部变量表存放了编译器可知的各种基本数据类型(boolean、byte、char、short、int、float、long、double)、对象引用(reference类型)、returnAddress类型(指向了一条字节码指令的地址)。64位长度的long和double类型的数据会占用2个局部变量空间(Slot),其余的数据类型占用1个。

本地方法栈(Native Method Stack):线程私有。(HashCode算法,线程实现)。

Java堆(Java Heap):线程共享。是Java虚拟机所管理的内存中最大的一块,虚拟机启动时创建。所有的对象实例以及数组都要在堆上分配。 堆是垃圾收集器管理的主要区域。堆中可能划分出多个线程私有的分配缓冲区(Thread Local Allocation Buffer, TLAB)。

方法区(Method Area):线程共享。存储已被虚拟机加载的类信息、常量、静态变量、即时编译器编译后的代码等。 运行时常量池(Runtime Constant Pool)是方法区的一部分。Class文件中除了有类的版本、字段、方法、接口等描述信息外。还有常量池(Constant Pool Table),存放编译期生成的字面量和符号引用。

2.3 Hopspot

2.3.1 对象创建

虚拟机遇到new指令时,首先检查这个指令的参数是否能在常量池中定位到一个类的符号引用,并且检查这个符号引用代表的类是否已被加载、解析和初始化过。如果没有,必须先进行类加载。

指针碰撞 空闲列表

2.3.2 对象的内存布局

对象在内存中储存的布局可以分为:对象头(Header)、实例数据(Instance Data)和对齐填充(Padding)。 对象头:包括两部分。第一部分用于储存对象自身的运行时数据,如哈希码(HashCode)、GC分代年龄、锁状态标志、线程持有的锁、偏向线程ID、偏向时间戳。另一部分是类型指针,通过这个指针确定这个对象是哪个类的实例。 如果对象是一个Java数组,在对象头还有一块用于记录数组长度。 实例数据:对象储存的有效信息,代码中所定义的各种类型的字段内容。 对齐填充:对象的大小必须是8字节的整数倍。对象头已经是8字节的倍数(1倍或2倍)。

2.3.3 对象的访问定位

通过栈上的reference数据来操作堆上的具体对象。 句柄访问:好处是reference中存储的是稳定的句柄地址,在对象被移动(垃圾回收时)时只会改变句柄中的实例数据指针。 直接指针访问:速度更快,节省了一次指针定位的开销。

2.4 实战: OutOfMemoryError异常

2.4.1 Java堆溢出。OutOfMemoryError: Java heap space

不断地创建对象,并且保证GC Roots到对象之间有可达路径。

2.4.2 虚拟机栈和本地方法栈溢出

递归调用方法会得到 StackOverFlowError异常。 内存有限,每个线程分配到的栈容量越大,可以建立的线程数越少,建立线程时越容易把剩下的内存耗尽。

2.4.3 方法区和运行时常量池溢出。

不断创建新的String对象,得到OutOfMemoryError:PermGen Space String.intern()是一个Native方法,如果字符串常量池中已经包含一个等于此String对象的字符串,则返回常量池中的String对象。

Read on →

只显示部分代码块, 省略部分条件判断、异常处理和日志

Reflector, ReflectorFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
public class DefaultReflectorFactory implements ReflectorFactory {
  private boolean classCacheEnabled = true;
  private final ConcurrentMap<Class<?>, Reflector> reflectorMap = new ConcurrentHashMap<>();

  @Override
  public Reflector findForClass(Class<?> type) {
    if (classCacheEnabled) {
      return reflectorMap.computeIfAbsent(type, Reflector::new);
    } else {
      return new Reflector(type);
    }
  }

}

/**
 * This class represents a cached set of class definition information that
 * allows for easy mapping between property names and getter/setter methods.
 */
public class Reflector {
  private final Class<?> type;
  private final String[] readablePropertyNames;
  private final String[] writeablePropertyNames;
  private final Map<String, Invoker> setMethods = new HashMap<>();
  private final Map<String, Invoker> getMethods = new HashMap<>();
  private final Map<String, Class<?>> setTypes = new HashMap<>();
  private final Map<String, Class<?>> getTypes = new HashMap<>();
  private Constructor<?> defaultConstructor;

  private Map<String, String> caseInsensitivePropertyMap = new HashMap<>();

  public Reflector(Class<?> clazz) {
    type = clazz;
    addDefaultConstructor(clazz);
    addGetMethods(clazz);
    addSetMethods(clazz);
    addFields(clazz);
    readablePropertyNames = getMethods.keySet().toArray(new String[getMethods.keySet().size()]);
    writeablePropertyNames = setMethods.keySet().toArray(new String[setMethods.keySet().size()]);
    for (String propName : readablePropertyNames) {
      caseInsensitivePropertyMap.put(propName.toUpperCase(Locale.ENGLISH), propName);
    }
    for (String propName : writeablePropertyNames) {
      caseInsensitivePropertyMap.put(propName.toUpperCase(Locale.ENGLISH), propName);
    }
  }

  private void addGetMethods(Class<?> cls) {
    Map<String, List<Method>> conflictingGetters = new HashMap<>();
    Method[] methods = getClassMethods(cls);
    for (Method method : methods) {
      if (method.getParameterTypes().length > 0) {
        continue;
      }
      String name = method.getName();
      if ((name.startsWith("get") && name.length() > 3)
          || (name.startsWith("is") && name.length() > 2)) {
        name = PropertyNamer.methodToProperty(name);
        addMethodConflict(conflictingGetters, name, method);
      }
    }
    resolveGetterConflicts(conflictingGetters);
  }

  private void resolveGetterConflicts(Map<String, List<Method>> conflictingGetters) {
    for (Entry<String, List<Method>> entry : conflictingGetters.entrySet()) {
      Method winner = null;
      String propName = entry.getKey();
      for (Method candidate : entry.getValue()) {
        if (winner == null) {
          winner = candidate;
          continue;
        }
        Class<?> winnerType = winner.getReturnType();
        Class<?> candidateType = candidate.getReturnType();
        if (candidateType.equals(winnerType)) {
          if (!boolean.class.equals(candidateType)) {
            throw new ReflectionException("");
          } else if (candidate.getName().startsWith("is")) {
            winner = candidate;
          }
        } else if (candidateType.isAssignableFrom(winnerType)) {
          // OK getter type is descendant
        } else if (winnerType.isAssignableFrom(candidateType)) {
          winner = candidate;
        } else {
          throw new ReflectionException("");
        }
      }
      addGetMethod(propName, winner);
    }
  }

  private void addGetMethod(String name, Method method) {
    if (isValidPropertyName(name)) {
      getMethods.put(name, new MethodInvoker(method));
      Type returnType = TypeParameterResolver.resolveReturnType(method, type);
      getTypes.put(name, typeToClass(returnType));
    }
  }

  private boolean isValidPropertyName(String name) {
    return !(name.startsWith("$") || "serialVersionUID".equals(name) || "class".equals(name));
  }
}
Read on →

一个简单的插件例子

1
2
3
4
5
6
7
8
9
@Intercepts({
  @Signature(type=Executor.class, method="query", args={
    MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class
  }),
  @Signature(type=Executor.class, method="close", args={boolean.class})
})
pulic class ExamplePlugin implements Interceptor {
  ...
}

实现

session/Configuration.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public Executor newExecutor(Transaction transaction, ExecutorType executorType) {
  executorType = executorType == null ? defaultExecutorType : executorType;
  executorType = executorType == null ? ExecutorType.SIMPLE : executorType;
  Executor executor;
  if (ExecutorType.BATCH == executorType) {
    executor = new BatchExecutor(this, transaction);
  } else if (ExecutorType.REUSE == executorType) {
    executor = new ReuseExecutor(this, transaction);
  } else {
    executor = new SimpleExecutor(this, transaction);
  }
  if (cacheEnabled) {
    executor = new CachingExecutor(executor);
  }
  // interceptorChain记录了配置的拦截器
  executor = (Executor) interceptorChain.pluginAll(executor);
  return executor;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public interface Interceptor {
  Object intercept(Invocation invocation) throws Throwable;

  Object plugin(Object target);

  void setProperties(Properties properties);
}

public class InterceptorChain {
  private final List<Interceptor> interceptors = new ArrayList<>();

  public Object pluginAll(Object target) {
    for (Interceptor interceptor : interceptors) {
      target = interceptor.plugin(target);
    }
    return target;
  }

  public void addInterceptor(Interceptor interceptor) {
    interceptors.add(interceptor);
  }

  public List<Interceptor> getInterceptors() {
    return Collections.unmodifiableList(interceptors);
  }
}

public class Plugin implements InvocationHandler {
  private final Object target;
  private final Interceptor interceptor;
  //@Signature注解的信息
  private final Map<Class<?>, Set<Method>> signatureMap;

  private Plugin(Object target, Interceptor interceptor, Map<Class<?>, Set<Method>> signatureMap) {
    this.target = target;
    this.interceptor = interceptor;
    this.signatureMap = signatureMap;
  }

  public static Object wrap(Object target, Interceptor interceptor) {
    // 获取@Signature注解信息
    Map<Class<?>, Set<Method>> signatureMap = getSignatureMap(interceptor);
    Class<?> type = target.getClass();
    Class<?>[] interfaces = getAllInterfaces(type, signatureMap);
    if (interfaces.length > 0) {
      // 使用JDK动态代理创建代理对象
      return Proxy.newProxyInstance(
          type.getClassLoader(),
          interfaces,
          new Plugin(target, interceptor, signatureMap));
    }
    return target;
  }

  @Override
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    try {
      Set<Method> methods = signatureMap.get(method.getDeclaringClass());
      // 方法需要被拦截
      if (methods != null && methods.contains(method)) {
        return interceptor.intercept(new Invocation(target, method, args));
      }
      // 方法不需要拦截,直接调用
      return method.invoke(target, args);
    } catch (Exception e) {
      throw ExceptionUtil.unwrapThrowable(e);
    }
  }
}

分页插件PageInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@Intercepts({
  @Signature(type=Executor.class, method="query", args={
    MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class
  })
})
pulic class PageInterceptor implements Interceptor {
  public Object plugin(Object target){
    return Plugin.wrap(target, this);
  }

  public Object intercept(final Invocation invocation) throws Throwable {
    final Object[] queryArgs = invocation.getArgs();
    final MappedStatement ms = (MappedStatement) queryArgs[0];
    final Object parameter = queryArgs[1];
    final RowBounds rowBounds = (RowBounds) queryArgs[2];

    int offset = rowBound.getOffset();
    int limit = rowBound.getLimit();

    final BoundSql bondSql = mappedStatement.getBoundSql(parameter);
    final StringBuffer bufferSql = new StringBuffer(boundSql.getSql());
    String sql = getFormmatSql(bufferSql.toString().trim())

    sql = getPagingSql(sql, offset, limit)

    //需要重置RowBounds
    queryArgs[2] = new RowBounds(RowBounds.NOROWOFFSET, RowBound.NOROWLIMIT);
    //根据最新的SQL,创建新的MappedStatement
    queryArgs[0] = createMappedStatement(mappedStatement, boundSql, sql);
    return invocation.proceed();
  }

  public String getPagingSql(String sql, int offset, int limit){
    sql = sql.trim();
    boolean hasForUpdate = false;
    String forUpdatePart = "for update";
    if(sql.endsWith(forUpdatePart)){
      sql = sql.substring(0, sql.length()-forUpdatePart.length());
      hasForUpdate = true
    }

    StringBuffer result = new StringBuffer(sql.length())
    result.append(sql).append(" limit ");
    if(offset > 0){
      result.append(offset).append(",").append(limit);
    } else {
      result.append(limit)
    }

    if(hasForUpdate){
      result.append(" for update");
    }
    return reesult.toString();
  }
}

第一章

数据库:物理操作系统文件的集合。 实例:后台线程和一个共享内存区。

单进程多线程架构。

配置文件 * /etc/my.cnf * /etc/mysql/my.cnf * /usr/local/mysql/etc/my.cnf * .my.cnf

体系结构

  • 连接池
  • 管理服务和工具组件
  • SQL接口
  • 查询分析器
  • 优化器
  • 缓存
  • 插件式储存引擎
  • 物理文件

InnoDB

面向OLTP。多版本并发控制MVCC。 REPEATABLE READ

聚集clustered方法,表的储存按主键的顺序存放。如果没有显式指定主键,则会为每行生成一个6字节的ROWID作为主键。

MyISAM

不支持事务、表锁设计,支持全文索引。主要面向OLAP。

由MYD和MYI组成。MYD存放数据,MYI存放索引文件。

第二章 InnoDB

后台线程

  • Master Thread。负责缓冲池中的数据异步刷新到磁盘,脏页刷新、合并插入缓冲、UNDO页的回收。
  • IO Thread. 使用AIO,共有4个线程, write, read, insert buffer, log
  • Purge Thread. 事务提交后,undolog不再需要。该线程回收已经使用并分配的undo页。
  • Page Cleaner Thread。脏页刷新。

内存

缓冲池。

储存引擎基于磁盘储存,记录按照页的方式进行管理。缓冲池是一块内存区域,下一次再读取相同的页,先在缓冲池中找。

对于数据库页的修改,首先修改缓冲池的页,再以一定频率刷新到硬盘。

缓冲池中的数据页类型有:索引页、数据页、undo页、insert buffer、adaptive hash index、lock info、 数据字典信息。

LRU List、Free List、Flush List

缓冲池通过LRU(Latest Recent Used)算法管理。最频繁使用页在LRU列表的前端。首先释放LRU列表中的尾端的页。 缓冲池中页的大小默认为16KB。

新读取的页,不直接放到LRU的首部,而是放到LRU列表的midpoint位置,列表长度的5/8。 把midpoint后面的列表称为old列表,之前的列表称为new列表。innodboldblocks_time表示页在midpoint需要多久才加入到LRU列表的热端。

为什么不采用朴素的LRU? 因为某些SQL操作会访问很多页,甚至全部页,但仅仅在该次查询操作,并不是活跃的热点数据。

在LRU类表的页被修改后,称为脏页(Dirty Page),即缓存和硬盘的页数据不一致。数据库会通过CHECKPOINT机制将脏页刷新回磁盘,Flush列表中的页即为脏页列表。

Read on →