#0 系列目录#
-
Zookeeper系列
-
Zookeeper源码
-
Zookeeper应用
#1 为什么使用ZooKeeper# 目前有关于分布式Session的实现基本上都是基于memcached。memcached本质上是一个内存缓存系统。虽然memcached也可以是分布式集群环境的,但是对于一份数据来说,它总是存储在某一台memcached服务器上
。如果发生网络故障或是服务器宕机,则存储在这台服务器上的所有数据都将不可访问
。由于数据是存储在内存中的,重启服务器,将导致数据全部丢失
。当然你可以自己实现一套机制,用来在分布式memcached之间进行数据的同步和持久化,但是实现这套机制谈何容易!
由上述ZooKeeper的特性可知,ZooKeeper是一个分布式小文件系统,并且被设计为高可用性。通过选举算法和集群复制可以避免单点故障
,由于是文件系统,所以即使所有的ZooKeeper节点全部挂掉,数据也不会丢失,重启服务器之后,数据即可恢复
。另外ZooKeeper的节点更新是原子的,也就是说更新不是成功就是失败
。通过版本号,ZooKeeper实现了更新的乐观锁
,当版本号不相符时,则表示待更新的节点已经被其他客户端提前更新了,而当前的整个更新操作将全部失败。当然所有的一切ZooKeeper已经为开发者提供了保障,我们需要做的只是调用API。
有人会怀疑ZooKeeper的执行能力,在ZooKeeper诞生的地方——Yahoo!给出了一组数据将打消你的怀疑。它的吞吐量标准已经达到大约每秒10000基于写操作的工作量
。对于读操作的工作量来说,它的吞吐量标准还要高几倍
。
#2 实现分布式Session所面临的挑战# 实现分布式session最大的挑战莫过于如何实现session在分布式系统之间的共享
。在分布式环境下,每个子系统都是跨网络的独立JVM,在这些JVM之间实现共享数据的方式无非就是TCP/IP通讯
。无论是memcached,还是ZooKeeper,底层都是基于TCP/IP的
。所以,我认为使用何种工具实现分布式Session都是可行的,没有那种实现优于另外一种实现,在不同的应用场景,各有优缺点。世间万物,无十全十美,不要盲目的崇拜某种技术,唯有适合才是真理。
##2.1 Session ID的共享## Session ID是一个实例化Session对象的唯一标识,也是它在Web容器中可以被识别的唯一身份标签。Jetty和Tomcat容器会通过一个Hash算法,得到一个唯一的ID字符串,然后赋值给某个实例化的Session,此时,这个Session就可以被放入Web容器的SessionManager中开始它短暂的一生。在Servlet中,我们可以通过HttpSession的getId()方法得到这个值,但是我们无法改变这个值。当Session走到它一生尽头的时候,Web容器的SessionManager会根据这个ID将其“火化”
。所以Session ID是非常重要的一个属性,并且要保证它的唯一性。在单系统中,Session ID只需要被自身的Web容器读写,但是在分布式环境中,多个Web容器需要共享同一个Session ID
。因此,当某个子系统的Web容器产生一个新的ID时,它必须需要一种机制来通知其他子系统,并且告知新ID是什么
。
##2.2 Session中数据的复制## 和共享Session ID的问题一样,在分布式环境下,Session中的用户数据也需要在各个子系统中共享
。当用户通过HttpSession的setAttribute()方法在Session中设置了一个用户数据时,它只存在于当前与用户交互的那个Web容器中,而对其他子系统的Web容器来说,这些数据是不可见的。当用户在下一步跳转到另外一个Web容器时,则又会创建一个新的Session对象,而此Session中并不包含上一步骤用户设置的数据。其实Session在分布式系统之间的复制实现是简单的,但是每次在Session数据发生变化时,都在子系统之间复制一次数据,会大大降低用户的响应速度
。因此我们需要一种机制,即可以保证Session数据的一致性,又不会降低用户操作的响应度
。
##2.3 Session的失效## Session是有生命周期的,当Session的空闲时间(maxIdle属性值)超出限制时,Session就失效了,这种设计主要是考虑到了Web容器的可靠性。当一个系统有上万人使用时,就会产生上万个Session对象,由于HTTP的无状态特性,服务器无法确切的知道用户是否真的离开了系统。因此如果没有失效机制,所有被Session占据的内存资源将永远无法被释放,直到系统崩溃为止。在分布式环境下,Session被简单的创建,并且通过某种机制被复制到了其他系统中。你无法保证每个子系统的时钟都是一致的,可能相差几秒,甚至相差几分钟
。当某个Web容器的Session失效时,可能其他的子系统中的Session并未失效,这时会产生一个有趣的现象,一个用户在各个子系统之间跳转时,有时会提示Session超时,而有时又能正常操作。因此我们需要一种机制,当某个系统的Session失效时,其他所有系统的与之相关联的Session也要同步失效
。
##2.4 类装载问题## 在单系统环境下,所有类被装载到“同一个”ClassLoader中。我在同一个上打了引号,因为实际上并非是同一个ClassLoader,只是逻辑上我们认为是同一个。这里涉及到了JVM的类装载机制,由于这个主题不是本文的讨论重点,所以相关详情可以参考相关的JVM文档。因此即使是由memcached或是ZooKeeper返回的字节数组也可以正常的反序列化成相对应的对象类型。但是在分布式环境下,问题就变得异常的复杂。我们通过一个例子来描述这个问题。用户在某个子系统的Session中设置了一个User类型的对象,通过序列化,将User类型的对象转换成字节数组,并通过网络传输到了memcached或是ZooKeeper上。此时,用户跳转到了另外一个子系统上,需要通过getAttribute方法从memcached或是ZooKeeper上得到先前设置的那个User类型的对象数据
。但是问题出现了,在这个子系统的ClassLoader中并没有装载User类型。因此在做反序列化时出现了ClassNotFoundException异常
。
当然上面描述的4点挑战只是在实现分布式Session过程中面临的关键问题,并不是全部。其实在我实现分布式Session的整个过程中还遇到了其他的一些挑战。比如,需要通过filter机制拦截HttpServletRequest,以便覆盖其getSession方法
。但是在不同的Web容器中(例如Jetty或是Tomcat)对HttpServletRequest的实现是不一样的,虽然都是实现了HttpServletRequest接口,但是各自又添加了一些特性在其中。例如,在Jetty容器中,HttpSession的实现类是一个保护内部类,无法从其继承并覆盖相关的方法,只能从其实现类的父类中继承更加抽象的Session实现。这样就会造成一个问题,我必须重新实现对Session整个生命周期管理的SessionManager接口。有人会说,那就放弃它的实现吧,我们自己实现HttpSession接口。很不幸,那是不可能的。因为在Jetty的HttpServletRequest实现类的一些方法中对Session的类型进行了强制转换(转换成它自定义的HttpSession实现类),如果不从其继承,则会出现ClassCastException异常
。相比之下,Tomcat的对HttpServletRequest和HttpSession接口的实现还是比较标准的。由此可见,实现分布式Session其实是和某种Web容器紧密耦合的
。并不像网上有些人的轻描淡写,仅仅覆盖setAttribute和getAttribute方法是行不通的。
#3 算法实现# 从上述的挑战来看,要写一个分布式应用程序是困难的,主要原因是因为局部故障。由于数据需要通过网络传输,而网络是不稳定的,所以如果网络发生故障,则所有的数据通讯都将终止。ZooKeeper并不能解决网络故障的发生,甚至它本身也是基于网络的分布式应用程序。但是它为我们提供了一套工具集合,帮助我们建立安全处理局部故障的分布式应用程序。接下来我们就开始描述如何实现基于ZooKeeper的分布式Session系统。
##3.1 基于ZooKeeper的分布式Session系统架构##
为了实现高可用性,采用了ZooKeeper集群,ZooKeeper集群是由一台领导者服务器和若干台跟随者服务器构成(总服务器数要奇数)
。所有的读操作由跟随者提供,而写操作由领导者提供,并且领导者还负责将写入的数据复制到集群中其他的跟随者
。当领导者服务器由于故障无法访问时,剩下的所有跟随者服务器就开始进行领导者的选举。通过选举算法,最终由一台原本是跟随者的服务器升级为领导者。当然原来的领导者服务器一旦被恢复,它就只能作为跟随者服务器,并在下一次选举中争夺领导者的位置。
Web容器中的Session容器也将发生变化。它不再对用户的Session进行本地管理,而是委托给ZooKeeper和我们自己实现的Session管理器。也就是说,ZooKeeper负责Session数据的存储,而我们自己实现的Session管理器将负责Session生命周期的管理
。
最后是关于在分布式环境下共享Session ID的策略
。我们还是通过客户端的Cookie来实现,我们会自定义一个Cookie,并通过一定的算法在多个子系统之间进行共享
。下面会对此进行详细的描述。
##3.2 分布式Session的数据模型## Session数据的存储是有一定格式的,下图展示了一个Session ID为”1gyh0za3qmld7”的Session在ZooKeeper上的存储结构:
“/SESSIONS”是一个组节点,用来在ZooKeeper上划分不同功能组的定义
。你可以把它理解为一个文件夹目录。在这个目录下可以存放0个或N个子节点,我们就把一个Session的实例作为一个节点,节点的名称就是Session ID
。在ZooKeeper中,每个节点本身也可以存放一个字节数组。因此,每个节点天然就是一个Key-Value键值对的数据结构
。
我们将Session中的用户数据(本质上就是一个Map)设计成多节点,节点名称就是Session的key,而节点的数据就是Session的Value
。采用这种设计主要是考虑到性能问题和ZooKeeper对节点大小的限制问题。当然,我们可以将Session中的用户数据保存在一个Map中,然后将Map序列化之后存储在对应的Session节点中。但是大部分情况下,我们在读取数据时并不需要整个Map,而是Map中的一个或几个值
。这样就可以避免一个非常大的Map在网络间传来传去。同理,在写Session的时候,也可以最大限度的减少数据流量
。另外由于ZooKeeper是一个小文件系统,为了性能,每个节点的大小为1MB
。如果Session中的Map大于1MB,就不能单节点的存储了。当然,一个Key的数据量是很少会超过1MB的,如果真的超过1MB,你就应该考虑一下,是否应该将此数据保存在Session中。
最后我们来关注一下Session节点中的数据——SessionMetaData。它是一个Session实例的元数据,保存了一些与Session生命周期控制有关的数据。以下代码就是SessionMetaData的实现:
package com.king.distributedSession;import java.io.Serializable;public class SessionMetaData implements Serializable { private static final long serialVersionUID = -6446174402446690125L; /** * Session实例的ID */ private String id; /** * session的创建时间 */ private Long createTm; /** * Session的最大空闲时间,默认情况下是30分钟。 */ private Long maxIdle; /** * Session的最后一次访问时间,每次调用Request.getSession方法时都会去更新这个值。用来计算当前Session是否超时。 * 如果lastAccessTm+maxIdle小于System.currentTimeMillis(),就表示当前Session超时。 */ private Long lastAccessTm; /** * 当前Session是否可用,如果超时,则此属性为false。 */ private Boolean validate = false; /** * 为了冗余Znode的version值,用来实现乐观锁,对Session节点的元数据进行更新操作。 */ private int version = 0; /** * 构造方法 */ public SessionMetaData() { this.createTm = System.currentTimeMillis(); this.lastAccessTm = this.createTm; this.validate = true; } public String getId() { return id; } public void setId(String id) { this.id = id; } public Long getCreateTm() { return createTm; } public void setCreateTm(Long createTm) { this.createTm = createTm; } public Long getMaxIdle() { return maxIdle; } public void setMaxIdle(Long maxIdle) { this.maxIdle = maxIdle; } public Long getLastAccessTm() { return lastAccessTm; } public void setLastAccessTm(Long lastAccessTm) { this.lastAccessTm = lastAccessTm; } public Boolean getValidate() { return validate; } public void setValidate(Boolean validate) { this.validate = validate; } public int getVersion() { return version; } public void setVersion(int version) { this.version = version; }}
这里有必要提一下一个老生常谈的问题,就是所有存储在节点上的对象必须是可序列化的,也就是必须实现Serializable接口,否则无法保存
。这个问题在memcached和ZooKeeper上都存在的。
##3.3 实现过程## 实现分布式Session的第一步就是要定义一个filter,用来拦截HttpServletRequest对象
。以下代码片段,展现了在Jetty容器下的filter实现。
package com.king.distributedSession.jetty;import javax.servlet.*;import java.io.IOException;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class JettyDistributedSessionFilter extends DistributedSessionFilter { private Logger log = LoggerFactory.getLogger(getClass()); @Override public void init(FilterConfig filterConfig) throws ServletException { super.init(filterConfig); // 实例化Jetty容器下的Session管理器 sessionManager = new JettyDistributedSessionManager(conf); try { sessionManager.start(); // 启动初始化 //创建组节点 ZooKeeperHelper.createGroupNode(); log.debug("DistributedSessionFilter.init completed."); } catch (Exception e) { log.error("distributedSessionFilter init ", e); } } @Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { // Jetty容器的Request对象包装器,用于重写Session的相关操作 JettyRequestWrapper req = new JettyRequestWrapper(request, sessionManager); chain.doFilter(req, response); }}
这个filter是继承自DistributedSessionFilter的,这个父类主要是负责完成初始化参数设置等通用方法的实现
,代码如下所示:
package com.king.distributedSession.jetty;import javax.servlet.Filter;import javax.servlet.FilterConfig;import javax.servlet.ServletException;import org.apache.commons.lang3.StringUtils;import org.eclipse.jetty.server.SessionManager;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public abstract class DistributedSessionFilter implements Filter { protected Logger log = LoggerFactory.getLogger(getClass()); /** * 参数配置 */ protected Configuration conf; /** * Session管理器 */ protected SessionManager sessionManager; /** * 初始化参数名称 */ public static final String SERVERS = "servers"; public static final String TIMEOUT = "timeout"; public static final String POOLSIZE = "poolsize"; /** * 初始化 * * @see javax.servlet.Filter#init(javax.servlet.FilterConfig) */ @Override public void init(FilterConfig filterConfig) throws ServletException { conf = new Configuration(); String servers = filterConfig.getInitParameter(SERVERS); if (StringUtils.isNotBlank(servers)) { conf.setServers(servers); } String timeout = filterConfig.getInitParameter(TIMEOUT); if (StringUtils.isNotBlank(timeout)) { try { conf.setTimeout(Long.valueOf(timeout)); } catch (NumberFormatException ex) { log.error("timeout parse error[" + timeout + "]."); } } String poolSize = filterConfig.getInitParameter(POOLSIZE); if (StringUtils.isNotBlank(poolSize)) { try { conf.setPoolSize(Integer.valueOf(poolSize)); } catch (NumberFormatException ex) { log.error("poolsize parse error[" + poolSize + "]."); } } //初始化ZooKeeper配置参数 ZooKeeperHelper.initialize(conf); } /** * 销毁 * * @see javax.servlet.Filter#destroy() */ @Override public void destroy() { if (sessionManager != null) { try { sessionManager.stop(); } catch (Exception e) { log.error("sessionManager stop : ", e); } } //销毁ZooKeeper ZooKeeperHelper.destroy(); log.debug("DistributedSessionFilter.destroy completed."); }}
在filter中需要关注的重点是doFilter方法:
@Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { // Jetty容器的Request对象包装器,用于重写Session的相关操作 JettyRequestWrapper req = new JettyRequestWrapper(request, sessionManager); chain.doFilter(req, response); }
这里实例化了一个包装器(装饰者模式)类,用来包装Jetty容器的Request对象,并覆盖其getSession方法
。另外我们还自己实现sessionManager接口,用来管理Session的生命周期
。通过filter机制,我们就接管了Session的整个生命周期的管理权。
接下来我们来看看,Request包装器是如何重写getSession方法,替换成使用ZooKeeper上的Session数据
。关键代码如下所示:
@Override public HttpSession getSession(boolean create) { // 检查Session管理器 if (sessionManager == null && create) { throw new IllegalStateException("No SessionHandler or SessionManager"); } if (session != null && sessionManager != null) { return session; } session = null; //从客户端cookie中查找Session ID String id = sessionManager.getRequestSessionId(request); log.debug("获取客户端的Session ID:[" + id + "]"); if (id != null && sessionManager != null) { // 如果存在,则先从管理器中取 session = sessionManager.getHttpSession(id, request); if (session == null && !create) { return null; } } // 否则实例化一个新的Session对象 if (session == null && sessionManager != null && create) { session = sessionManager.newHttpSession(request); } return session; }
其实实现很简单,大部分工作都委托给了sessionManager来处理。因此,还是让我们来关注sessionManager的相关方法实现:
- 获取Session ID:
@Override public String getRequestSessionId(HttpServletRequest request) { return CookieHelper.findSessionId(request); }
这个方法就是从客户端的Cookies中查找我们的一个自定义的Cookie值,这个Cookie的名称为:”DISTRIBUTED_SESSION_ID”(Web容器自己也在Cookie中写了一个值,用来在不同的request中传递Session ID,这个Cookie的名称叫“JSESSIONID”)
。如果返回null,则表示客户端从来都没有创建过Session实例。
如果返回的Cookie值不为null,则有3种可能性:
其一,已经实例化过一个Session对象并且可以正常使用;其二,虽然已经实例化过了,但是可能此Session已经超时失效;其三,分布式环境中的其他子系统已经实例化过了,但是本系统中还未实例化过此Session对象。所以先要对已经存在的Session ID进行处理。关键代码如下:
@Override public HttpSession getHttpSession(String id, HttpServletRequest request) { // 类型检查 if (!(request instanceof Request)) { log.warn("不是Jetty容器下的Request对象"); return null; } // 将HttpServletRequest转换成Jetty容器的Request类型 Request req = (Request) request; // ZooKeeper服务器上查找指定节点是否有效 boolean valid = ZooKeeperHelper.isValid(id); // 如果为false,表示服务器上无该Session节点,需要重新创建(返回null) if (!valid) { // 删除本地的副本 sessions.remove(id); return null; } else { // 更新Session节点的元数据 ZooKeeperHelper.updateSessionMetaData(id); HttpSession session = sessions.get(id); // 如果存在,则直接返回 if (session != null) { return session; } // 否则创建指定ID的Session并返回(用于同步分布式环境中的其他机器上的Session本地副本) session = new JettyDistributedSession((AbstractSessionManager) req.getSessionManager(), System.currentTimeMillis(), id); sessions.put(id, session); return session; } }
首先根据ID去ZooKeeper上验证此Session是否有效,如果无效了,则直接返回null,表示此Session已经超时不可用,同时需要删除本地的“影子”Session对象(不管存在与否)。如果该节点有效,则首先更新该Session节点的元数据(例如,最后一次访问时间)。然后先到本地的Session容器中查找是否存在该ID的Session对象。本地Session容器中的Session对象并不用来保存用户数据,也不进行生命周期管理,纯粹为了在不同请求中进行传递。唯一有价值的就Session ID,因此,我喜欢把本地Session容器中的Session对象称为“影子”Session,它只是ZooKeeper上真正Session的一个影子而已。
如果Session节点没有失效,但是本地Session容器并没有指定ID的”影子”Session,则表示是第三种可能性,需要进行影子Session的同步。正如代码中所展示的,我们实例化一个指定ID的Session对象,并放入当前系统的Session容器中,这样就完成了Session ID在分布式环境中的共享,以及Session对象在各子系统之间的同步。
- 如果通过上面的方法返回的Session对象还是null,则真的需要实例化一个Session对象了,代码如下所示:
public HttpSession newHttpSession(HttpServletRequest request) { // 类型检查 if (!(request instanceof Request)) { log.warn("不是Jetty容器下的Request对象"); return null; } // 将HttpServletRequest转换成Jetty容器的Request类型 Request req = (Request) request; Session session = new JettyDistributedSession( (AbstractSessionManager) req.getSessionManager(), request); addHttpSession(session, request); String id = session.getId(); // 写cookie Cookie cookie = CookieHelper.writeSessionIdToCookie(id, req, req.getConnection() .getResponse()); if (cookie != null) { log.debug("Wrote sid to Cookie,name:[" + cookie.getName() + "],value:[" + cookie.getValue() + "]"); } // 在ZooKeeper服务器上创建session节点,节点名称为Session ID // 创建元数据 SessionMetaData metadata = new SessionMetaData(); metadata.setId(id); metadata.setMaxIdle(config.getTimeout() * 60 * 1000); //转换成毫秒 ZooKeeperHelper.createSessionNode(metadata); return session; }
以上代码会实例化一个Session对象,并将Session ID写入客户端Cookie中,最后实例化Session元数据,并在ZooKeeper上新建一个Session节点。
- 通过上面步骤,我们就将Session的整个生命周期管理与ZooKeeper关联起来了。接下来我们看看Session对象的几个重要方法的重写:
public synchronized Object getAttribute(String name) { // 获取session ID String id = getId(); if (StringUtils.isNotBlank(id)) { // 返回Session节点下的数据 return ZooKeeperHelper.getSessionData(id, name); } return null; }
public synchronized void removeAttribute(String name) { //获取session ID String id = getId(); if (StringUtils.isNotBlank(id)) { //删除Session节点下的数据 ZooKeeperHelper.removeSessionData(id, name); } }
public synchronized void setAttribute(String name, Object value) { //获取session ID String id = getId(); if (StringUtils.isNotBlank(id)) { //将数据添加到ZooKeeper服务器上 ZooKeeperHelper.setSessionData(id, name, value); } }
public void invalidate() throws IllegalStateException { //获取session ID String id = getId(); if (StringUtils.isNotBlank(id)) { //删除Session节点 ZooKeeperHelper.deleteSessionNode(id); } }
这些方法中都是直接和ZooKeeper上对应的Session进行数据交换。本来我是想在本地Session对象上创建一个ZooKeeper的缓冲,当用户调用Session的读方法时,先到本地缓冲中读数据,读不到再到ZooKeeper上去取,这样可以减少网络的通讯开销。但在分布式环境下,这种策略所带来的数据同步开销更加的可观。因为每次一个子系统的Session数据更新,都将触发所有其他子系统与之关联的Session数据同步操作,否则Session中数据的一致性将无法得到保障。
- 看到这里,大家可能已经发觉了,所有与ZooKeeper交互的代码都被封装到ZooKeeperHelper类中,接下来就来看看这个类的实现:
public class ZooKeeperHelper { private static Logger log = LoggerFactory.getLogger(ZooKeeperHelper.class); private static String hosts; private static ExecutorService pool = Executors.newCachedThreadPool(); private static final String GROUP_NAME = "/SESSIONS"; /** * 初始化 */ public static void initialize(Configuration config) { hosts = config.getServers(); } /** * 销毁 */ public static void destroy() { if (pool != null) { pool.shutdown(); } } /** * 连接服务器 * * @return */ public static ZooKeeper connect() { ConnectionWatcher cw = new ConnectionWatcher(); return cw.connection(hosts); } /** * 关闭一个会话 */ public static void close(ZooKeeper zk) { if (zk != null) { try { zk.close(); } catch (InterruptedException e) { log.error("close zk session : ", e); } } } /** * 验证指定ID的节点是否有效 * * @param id * @return */ public static boolean isValid(String id) { ZooKeeper zk = connect(); if (zk != null) { try { return isValid(id, zk); } finally { close(zk); } } return false; } /** * 验证指定ID的节点是否有效 * * @param id * @param zk * @return */ public static boolean isValid(String id, ZooKeeper zk) { if (zk != null) { //获取元数据 SessionMetaData metadata = getSessionMetaData(id, zk); //如果不存在或是无效,则直接返回null if (metadata == null) { return false; } return metadata.getValidate(); } return false; } /** * 返回指定ID的Session元数据 */ public static SessionMetaData getSessionMetaData(String id, ZooKeeper zk) { if (zk != null) { String path = GROUP_NAME + "/" + id; try { // 检查节点是否存在 Stat stat = zk.exists(path, false); // stat为null表示无此节点 if (stat == null) { return null; } // 获取节点上的数据 byte[] data = zk.getData(path, false, null); if (data != null) { // 反序列化 Object obj = SerializationUtils.deserialize(data); // 转换类型 if (obj instanceof SessionMetaData) { SessionMetaData metadata = (SessionMetaData) obj; // 设置当前版本号 metadata.setVersion(stat.getVersion()); return metadata; } } } catch (KeeperException e) { log.error("getSessionMetaData for " + path + " : ", e); } catch (InterruptedException e) { log.error("getSessionMetaData for " + path + " : ", e); } } return null; } /** * 更新Session节点的元数据 * * @param id Session ID */ public static void updateSessionMetaData(String id) { ZooKeeper zk = connect(); try { //获取元数据 SessionMetaData metadata = getSessionMetaData(id, zk); if (metadata != null) { updateSessionMetaData(metadata, zk); } } finally { close(zk); } } /** * 更新Session节点的元数据 * * @param zk */ public static void updateSessionMetaData(SessionMetaData metadata, ZooKeeper zk) { try { if (metadata != null) { String id = metadata.getId(); Long now = System.currentTimeMillis(); //当前时间 // 检查是否过期 Long timeout = metadata.getLastAccessTm() + metadata.getMaxIdle(); //空闲时间 // 如果空闲时间小于当前时间,则表示Session超时 if (timeout < now) { metadata.setValidate(false); log.debug("Session节点已超时[" + id + "]"); } // 设置最后一次访问时间 metadata.setLastAccessTm(now); // 更新节点数据 String path = GROUP_NAME + "/" + id; byte[] data = SerializationUtils.serialize(metadata); zk.setData(path, data, metadata.getVersion()); log.debug("更新Session节点的元数据完成[" + path + "]"); } } catch (KeeperException e) { log.error("updateSessionMetaData : ", e); } catch (InterruptedException e) { log.error("updateSessionMetaData : ", e); } } /** * 返回ZooKeeper服务器上的Session节点的所有数据,并装载为Map * * @param id * @return */ public static Map getSessionMap(String id) { ZooKeeper zk = connect(); if (zk != null) { String path = GROUP_NAME + "/" + id; try { // 获取元数据 SessionMetaData metadata = getSessionMetaData(path, zk); // 如果不存在或是无效,则直接返回null if (metadata == null || !metadata.getValidate()) { return null; } // 获取所有子节点 Listnodes = zk.getChildren(path, false); // 存放数据 Map sessionMap = new HashMap<>(); for (String node : nodes) { String dataPath = path + "/" + node; Stat stat = zk.exists(dataPath, false); // 节点存在 if (stat != null) { // 提取数据 byte[] data = zk.getData(dataPath, false, null); if (data != null) { sessionMap.put(node, SerializationUtils.deserialize(data)); } else { sessionMap.put(node, null); } } } return sessionMap; } catch (KeeperException e) { log.error("getSessionMap for " + path + " : ", e); } catch (InterruptedException e) { log.error("getSessionMap for " + path + " : ", e); } finally { close(zk); } } return null; } /** * 创建一个组节点 */ public static void createGroupNode() { ZooKeeper zk = connect(); if (zk != null) { try { // 检查节点是否存在 Stat stat = zk.exists(GROUP_NAME, false); //stat为null表示无此节点,需要创建 if (stat == null) { // 创建组件点 String createPath = zk.create(GROUP_NAME, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); log.debug("创建节点完成:[" + createPath + "]"); } else { log.debug("组节点已存在,无需创建[" + GROUP_NAME + "]"); } } catch (KeeperException e) { log.error("createGroupNode for " + GROUP_NAME + " : ", e); } catch (InterruptedException e) { log.error("createGroupNode for " + GROUP_NAME + " : ", e); } finally { close(zk); } } } /** * 创建指定Session ID的节点 */ public static String createSessionNode(SessionMetaData metadata) { if (metadata == null) { return null; } ZooKeeper zk = connect(); //连接服务期 if (zk != null) { String path = GROUP_NAME + "/" + metadata.getId(); try { // 检查节点是否存在 Stat stat = zk.exists(path, false); //stat为null表示无此节点,需要创建 if (stat == null) { // 创建组件点 String createPath = zk.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); log.debug("创建Session节点完成:[" + createPath + "]"); //写入节点数据 zk.setData(path, SerializationUtils.serialize(metadata), -1); return createPath; } } catch (KeeperException e) { log.error("createSessionNode for " + path + " : ", e); } catch (InterruptedException e) { log.error("createSessionNode for " + path + " : ", e); } finally { close(zk); } } return null; } /** * 创建指定Session ID的节点(异步操作) * * @return */ public static String asynCreateSessionNode(final SessionMetaData metadata, boolean waitFor) { Callable task = new Callable () { @Override public String call() throws Exception { return createSessionNode(metadata); } }; try { Future result = pool.submit(task); // 如果需要等待执行结果 if (waitFor) { while (true) { if (result.isDone()) { return result.get(); } } } } catch (Exception e) { log.error("asynCreateSessionNode : ", e); } return null; } /** * 删除指定Session ID的节点 * * @param sid Session ID * @return */ public static boolean deleteSessionNode(String sid) { ZooKeeper zk = connect(); //连接服务期 if (zk != null) { String path = GROUP_NAME + "/" + sid; try { // 检查节点是否存在 Stat stat = zk.exists(path, false); //如果节点存在则删除之 if (stat != null) { //先删除子节点 List nodes = zk.getChildren(path, false); if (nodes != null) { for (String node : nodes) { zk.delete(path + "/" + node, -1); } } //删除父节点 zk.delete(path, -1); log.debug("删除Session节点完成:[" + path + "]"); return true; } } catch (KeeperException e) { log.error("deleteSessionNode for " + path + " : ", e); } catch (InterruptedException e) { log.error("deleteSessionNode for " + path + " : ", e); } finally { close(zk); } } return false; } /** * 删除指定Session ID的节点(异步操作) * * @param sid * @param waitFor 是否等待执行结果 * @return */ public static boolean asynDeleteSessionNode(final String sid, boolean waitFor) { Callable task = new Callable () { @Override public Boolean call() throws Exception { return deleteSessionNode(sid); } }; try { Future result = pool.submit(task); //如果需要等待执行结果 if (waitFor) { while (true) { if (result.isDone()) { return result.get(); } } } } catch (Exception e) { log.error("asynDeleteSessionNode : ", e); } return false; } /** * 在指定Session ID的节点下添加数据节点 * * @param sid Session ID * @param name 数据节点的名称 * @param value 数据 * @return */ public static boolean setSessionData(String sid, String name, Object value) { boolean result = false; ZooKeeper zk = connect(); //连接服务器 if (zk != null) { String path = GROUP_NAME + "/" + sid; try { // 检查指定的Session节点是否存在 Stat stat = zk.exists(path, false); //如果节点存在则删除之 if (stat != null) { //查找数据节点是否存在,不存在就创建一个 String dataPath = path + "/" + name; stat = zk.exists(dataPath, false); if (stat == null) { //创建数据节点 zk.create(dataPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); log.debug("创建数据节点完成[" + dataPath + "]"); } //在节点上设置数据,所有数据必须可序列化 if (value instanceof Serializable) { int dataNodeVer = -1; if (stat != null) { //记录数据节点的版本 dataNodeVer = stat.getVersion(); } byte[] data = SerializationUtils.serialize((Serializable) value); stat = zk.setData(dataPath, data, dataNodeVer); log.debug("更新数据节点数据完成[" + dataPath + "][" + value + "]"); result = true; } } } catch (KeeperException e) { log.error("setSessionData for " + path + " : ", e); } catch (InterruptedException e) { log.error("setSessionData for " + path + " : ", e); } finally { close(zk); } } return result; } /** * 删除指定Session ID的节点(异步操作) * * @param sid * @param waitFor 是否等待执行结果 * @return */ public static boolean asynSetSessionData(final String sid, final String name, final Object value, boolean waitFor) { Callable task = new Callable () { @Override public Boolean call() throws Exception { return setSessionData(sid, name, value); } }; try { Future result = pool.submit(task); // 如果需要等待执行结果 if (waitFor) { while (true) { if (result.isDone()) { return result.get(); } } } } catch (Exception e) { log.error("asynSetSessionData : ", e); } return false; } /** * 返回指定Session ID的节点下数据 * * @param sid Session ID * @param name 数据节点的名称 * @return */ public static Object getSessionData(String sid, String name) { ZooKeeper zk = connect(); //连接服务器 if (zk != null) { String path = GROUP_NAME + "/" + sid; try { // 检查指定的Session节点是否存在 Stat stat = zk.exists(path, false); if (stat != null) { //查找数据节点是否存在 String dataPath = path + "/" + name; stat = zk.exists(dataPath, false); Object obj = null; if (stat != null) { //获取节点数据 byte[] data = zk.getData(dataPath, false, null); if (data != null) { //反序列化 obj = SerializationUtils.deserialize(data); } } return obj; } } catch (KeeperException e) { log.error("getSessionData for " + path + " : ", e); } catch (InterruptedException e) { log.error("getSessionData for " + path + " : ", e); } finally { close(zk); } } return null; } /** * 删除指定Session ID的节点下数据 * * @param sid Session ID * @param name 数据节点的名称 * @return */ public static void removeSessionData(String sid, String name) { ZooKeeper zk = connect(); //连接服务器 if (zk != null) { String path = GROUP_NAME + "/" + sid; try { // 检查指定的Session节点是否存在 Stat stat = zk.exists(path, false); if (stat != null) { //查找数据节点是否存在 String dataPath = path + "/" + name; stat = zk.exists(dataPath, false); if (stat != null) { //删除节点 zk.delete(dataPath, -1); } } } catch (KeeperException e) { log.error("removeSessionData for " + path + " : ", e); } catch (InterruptedException e) { log.error("removeSessionData for " + path + " : ", e); } finally { close(zk); } } }}
从这个类的实现中我们可以发现,与ZooKeeper交互的API非常的友好,基本上就是对文件系统的管理——创建文件、删除文件、检查文件是否存在,更新文件等等。并且对节点的查找就是对文件绝对路径的搜索,效率非常的高。例如,用户调用Session的getAttribute(String key)方法,则根据当前Session可以拼装成一个搜索节点的路径:/SESSIONS/ / 。这样可以快速的定位,并获取该节点的数据。
另外,在这个类中,我还实现类一些操作的异步版本。原来是想为了提高用户响应度,在创建、修改Session节点的时候使用异步调用,但是实际测试下来还是有问题的。所以目前放弃了所有操作的异步版本。
- 最后让我们来看看连接ZooKeeper服务器的实现类,代码如下所示:
public class ConnectionWatcher implements Watcher { private Logger log = LoggerFactory.getLogger(getClass()); private static final int SESSION_TIMEOUT = 5000; private CountDownLatch signal = new CountDownLatch(1); /** * @throws IOException * @throws InterruptedException */ public ZooKeeper connection(String servers) { ZooKeeper zk; try { zk = new ZooKeeper(servers, SESSION_TIMEOUT, this); signal.await(); return zk; } catch (IOException e) { log.error("", e); } catch (InterruptedException e) { log.error("", e); } return null; } public void process(WatchedEvent event) { Event.KeeperState state = event.getState(); if (state == Event.KeeperState.SyncConnected) { signal.countDown(); } }}
这个类需要关注的是实现Watcher接口,在上面描述ZooKeeper特性的时候曾经提到过,ZooKeeper通过Watcher机制实现客户端与服务器之间的松耦合交互,在process方法中,通过对各种事件的监听,可以进行异步的回调处理。
这里的SESSION_TIMEOUT并不是Web容器中Session的超时。这是ZooKeeper对一个客户端的连接,即一个连接会话的超时设置。该值一般设置在2~5秒之间。
#4 总结# 目前基于ZooKeeper的分布式Session系统的实现还是比较初步的。还有很多功能有待完善,比如要添加Session监听事件的支持、对ZooKeeper上被标记为不可用的Session节点的删除、对Session进行监控和管理的控制台以及非常难解决的ClassLoader问题等
。另外,前文也提到了,分布式Session的实现是和某个Web容器紧密耦合的,这一点让我很不爽。因为需要针对不同的Web容器各自实现一套Session的管理机制。不过我相信通过良好的设计,可以实现通用的组件。目前我已经实现了在Jetty和Tomcat容器下的分布式Session。
在文章的最后,我们讨论一下如何解决ClassLoader问题。其实,在OSGi框架下,这个问题并不是很麻烦。因为,我们可以将所有领域对象类打包成一个单独的Bundle。同时将分布式Session的Filter实现也打包成一个Bundle。通过动态引用的方式,就可以引入所有领域对象的类型了。但在非OSGi环境下,只能将领域对象的类文件在每个子系统中都包含一份来解决ClassLoader问题。这样会造成一个问题,就是当领域对象发生变化时,我需要重启所有的子系统,来装载更新后的领域对象类,而不像在OSGi下,只需要重启这个领域对象Bundle就可以了。