初探oss-事件通知(http endpoint)

初识oss

it技术更新换代太快了,我们的思想也必须跟上才行。

最近项目里需要用到文件上传,我想想挺简单的啊,关于spring mvc文件上传网上的示例太多了啊,抄一抄,三下五除二就弄好了。

好景不长,文件上传问题太多了,虽然从头到尾只出了两个问题,但是用户体验太差了,该怎么办呢?又要支持大文件,又要支持批量, 又在web端,感觉再好的插件也拯救不了我了。于是问问公司的前辈,才刚说上几句,前辈就说怎么不用oss?

啥?oss是什么?oss(Object Storage Service)即对象存储服务。

海量、安全、低成本、高可靠的云存储服务,提供99.999999999%的数据可靠性。 使用RESTful API 可以在互联网任何位置存储和访问,容量和处理能力弹性扩展,多种存储类型供选择全面优化存储成本。

了解到这里,我感觉这个就是能拯救我的方案了,立马查阅文档开始试试。

确定使用场景

根据之前的需求,用户通过网站上传图片到服务器上,然后对图片进行识别处理。

改用oss方案后,用户通过使用oss控制台客户端上传文件,项目监听到文件上传完成,再获取资源进行识别处理。

使用oss之后逻辑稍有改变,由于文件存放不在同一个地方,这里需要使用oss提供的事件通知来完成。

oss事件通知整体架构图如下(借用图一张) oss事件通知整体架构

由图中可以看到,oss事件通知提供了两种方式:

  • HttpServer http的方式,即配置一个自己项目的访问地址(公网),当匹配规则匹配到时,往该地址推送数据
  • MNS Queue 队列的方式,即在项目中订阅oss提供的主题,获取匹配规则推送的数据

这里要介绍的是Http方式,后面如果需要再使用队列方式试试。

小试牛刀

关于oss事件通知的配置,阿里云已经提供了一篇文档 如何使用OSS事件通知功能?, 虽然讲的很清楚了(事后才觉得清楚,在没会之前是蒙圈的),但我觉得有必要把一些要点再提一下,少踩坑!

第一步:配置事件通知

简单理解,就是当bucket有文件变化(上传,覆盖,删除,追加……)时给予通知

帮助文档提供的截图和截止文章当前时间已经有所不同,下面带大家一步一步配置

首先进入oss管理控制台如下图: oss-console

选中bucket,右侧页面进入到该bucket的相关信息,然后点击【事件通知】,由于我已经创建了一条规则,所以这里 能看到有一条规则 create-rule

点击【创建规则】,在右侧会弹出界面,如下图所示,需要填写规则名称,事件类型,资源描述,接受终端 creating-rule

  • 规则名称
    • 规则的唯一标识,同一账号同一 Region 同一产品下规则名称不能重名。字符必须是英文,数字,横划线,长度不超过 128 个
  • 事件类型
    • 选择您想要触发通知的事件,可以选择多个。同样的事件不可以多次配置在同一资源上
    • PutObject ,创建/覆盖文件:简单上传
    • 这里我只举一个例子,更多其他的事件类型参考事件类型列表
  • 资源描述
    • 资源描述可以是全名、前缀、后缀以及前后缀,不同资源描述不能有交集。
    • 在本次实例中,我配置了全名,前后缀
      • 第一行代表固定文件名(movie.zip)上传,就会触发事件
      • 第二行代表以m开头的文件上传,就会触发事件
      • 第三行代表.jpg格式的文件上传,就会触发事件
    • 在我第一次使用时这个地方没有理解资源描述是什么意思,我上传文件怎么都不触发,直到发送工单,才得知是这里的问题
  • 接受终端
    • 有两种可以选择,一个是http,一个是队列,本篇讲的是http方式,这里我选择http
    • http://domain.com:8080/oss/listener 地址是公网能够访问到的

填写完毕后,点击底部的【确认】按钮确认添加该规则,确认之后就会像之前看到的,出现一条规则。

这里需要注意的是,每添加一条规则会自动创建一条对应的主题,可以在消息服务控制台查看到该主题,如下图: oss-topic

还需要注意的是图中指出的温馨提示,主题实例是会产生费用的

到这里,在阿里云控制台配置的工作就已经完成了,接下来要做的就是实现接受消息

第二步:接受消息通知

在上一步接受终端操作中我配置了一个可以公网访问的地址http://domain.com:8080/oss/listener,他的作用就是当规则匹配之后 oss会向该地址发送消息。根据oss技术人员提供的信息,我们可以从 MNS Java SDK 这个页面下载示例代码

解压下载后的文件,我们需要关注的是 HttpEndpoint.java 这个文件,这里对该文件代码不做详细的解释,根据自己项目的环境,做相应改动即可

我的项目环境是spring mvc,下面贴出我已经调试好的代码:

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.entity.InputStreamEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;

/**
 * oss文件上传成功
 *
 * @return
 */
@PostMapping(value = "/oss/listener")
@ResponseBody
public ResponseEntity<String> ossfileuploadsuccess(HttpServletRequest request, HttpServletResponse response) {
    log.info("oss客户端上传文件");
    long start = System.currentTimeMillis();
    try {
        HttpEntity entity = new InputStreamEntity(request.getInputStream(),
                request.getContentLength());

        String method = request.getMethod().toUpperCase(Locale.ENGLISH);

        String target = request.getRequestURI();


        Enumeration<String> headerNames = request.getHeaderNames();
        Map<String, String> hm = new HashMap<>();
        while (headerNames.hasMoreElements()) {
            String name = headerNames.nextElement();
            String value = request.getHeader(name);
            log.debug("{}:{}", name, value);
            hm.put(name, value);
        }

        //verify request
        String certHeader = request.getHeader("x-mns-signing-cert-url");
        if (certHeader == null) {
            log.info("SigningCerURL Header not found");
            return ResponseEntity.status(HttpStatus.SC_BAD_REQUEST).body("SigningCerURL Header not found");
        }

        String cert = certHeader;
        if (cert.isEmpty()) {
            log.info("SigningCertURL empty");
            return ResponseEntity.status(HttpStatus.SC_BAD_REQUEST).body("SigningCertURL empty");
        }
        cert = new String(Base64.decodeBase64(cert));
        log.info("SigningCertURL:\t" + cert);


        if (!authenticate(method, target, hm, cert)) {
            log.info("authenticate fail");
            return ResponseEntity.status(HttpStatus.SC_BAD_REQUEST).body("authenticate fail");
        }

        //parser content of simplified notification
        InputStream is = entity.getContent();
        BufferedReader reader = new BufferedReader(new InputStreamReader(is));
        StringBuffer buffer = new StringBuffer();
        String line = "";
        while ((line = reader.readLine()) != null) {
            buffer.append(line);
        }
        String content = buffer.toString();
        String result = null;
        byte[] messageBodyAsBytes = content.getBytes();
        if (messageBodyAsBytes != null) {
            result = new String(Base64.decodeBase64(messageBodyAsBytes), "UTF-8");
        }
        //这里的result就是文件的信息
        log.info("Simplified Notification: \n {}", result);
        
        log.info("处理oss文件线程启动完毕,耗时:{}", System.currentTimeMillis() - start);
    } catch (UnsupportedEncodingException var4) {
        log.error("Not support encoding: UTF-8", var4);
        return ResponseEntity.status(HttpStatus.SC_INTERNAL_SERVER_ERROR).body("Not support encoding: UTF-8");
    } catch (Exception e) {
        log.error("oss文件上传监听系统错误", e);
        return ResponseEntity.status(HttpStatus.SC_INTERNAL_SERVER_ERROR).body("system error");
    }
    return ResponseEntity.noContent().build();
}

/**
 * check if this request comes from MNS Server
 *
 * @param method,  http method
 * @param uri,     http uri
 * @param headers, http headers
 * @param cert,    cert url
 * @return true if verify pass
 */
private Boolean authenticate(String method, String uri, Map<String, String> headers, String cert) {
    String str2sign = getSignStr(method, uri, headers);
    //System.out.println(str2sign);
    //这里需要注意大小写,官方给出的代码这里是大写A,在我实际操作中为小写,到底是什么,需要结合实际情况
    String signature = headers.get("authorization");
    byte[] decodedSign = Base64.decodeBase64(signature);
    //get cert, and verify this request with this cert
    try {
        //String cert = "http://mnstest.oss-cn-hangzhou.aliyuncs.com/x509_public_certificate.pem";
        URL url = new URL(cert);
        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
        DataInputStream in = new DataInputStream(conn.getInputStream());
        CertificateFactory cf = CertificateFactory.getInstance("X.509");

        Certificate c = cf.generateCertificate(in);
        PublicKey pk = c.getPublicKey();

        java.security.Signature signetcheck = java.security.Signature.getInstance("SHA1withRSA");
        signetcheck.initVerify(pk);
        signetcheck.update(str2sign.getBytes());
        Boolean res = signetcheck.verify(decodedSign);
        return res;
    } catch (Exception e) {
        e.printStackTrace();
        log.warn("authenticate fail, " + e.getMessage());
        return false;
    }
}

/**
 * build string for sign
 *
 * @param method,  http method
 * @param uri,     http uri
 * @param headers, http headers
 * @return String fro sign
 */
private String getSignStr(String method, String uri, Map<String, String> headers) {
    StringBuilder sb = new StringBuilder();
    sb.append(method);
    sb.append("\n");
    //注意大小写
    sb.append(safeGetHeader(headers, "content-md5"));
    sb.append("\n");
    //注意大小写
    sb.append(safeGetHeader(headers, "content-type"));
    sb.append("\n");
    //注意大小写
    sb.append(safeGetHeader(headers, "date"));
    sb.append("\n");

    List<String> tmp = new ArrayList<String>();
    for (Map.Entry<String, String> entry : headers.entrySet()) {
        if (entry.getKey().startsWith("x-mns-")){
            tmp.add(entry.getKey() + ":" + entry.getValue());
        }
    }
    Collections.sort(tmp);

    for (String kv : tmp) {
        sb.append(kv);
        sb.append("\n");
    }

    sb.append(uri);
    return sb.toString();
}

private String safeGetHeader(Map<String, String> headers, String name) {
        if (headers.containsKey(name)) {
            return headers.get(name);
        } else {
            return "";
        }
    }

需要注意的是由于主题默认的重试策略为 EXPONENTIAL_DECAY_RETRY ,上面的程序需要在[5s]之类返回[204]httpcode,否则阿里认为该推送为不成功。 即,按照重试规则来重新推送数据直到收到204代码为止

代码虽然很多,但是不复杂,其实就是对request进行解析,进行验证,把oss包含信息取出来,最终获取到base64编码的文件信息。

第三步:检验成果

把项目启动,然后使用oss控制台客户端上传文件 oss-upload-success

当文件上传完毕后,在java控制台打印出如下数据:

{
  "events": [
    {
      "eventName": "ObjectCreated:PutObject",
      "eventSource": "acs:oss",
      "eventTime": "2018-02-22T08:32:21.000Z",
      "eventVersion": "1.0",
      "oss": {
        "bucket": {
          "arn": "acs:oss:cn-beijing:1234346345345345:display-sjf-event-notification-test",
          "name": "display-sjf-event-notification-test",
          "ownerIdentity": "1234346345345345",
          "virtualBucket": ""
        },
        "object": {
          "deltaSize": 312148,
          "eTag": "D9703079D307A57C4967B30AC36FCA81",
          "key": "20170731095417740.png",
          "size": 312148
        },
        "ossSchemaVersion": "1.0",
        "ruleId": "oss-upload-success-img-http"
      },
      "region": "ch-china",
      "requestParameters": {
        "sourceIPAddress": "10.0.0.0"
      },
      "responseElements": {
        "requestId": "5A815115776D389D9FE118C1"
      },
      "userIdentity": {
        "principalId": "12345678934534534"
      }
    }
  ]
}

//参数解释
{
  "events": [
    {
      "eventName": "", //事件通知类型
      "eventSource": "", //消息源,固定为"acs:oss"
      "eventTime": "", //事件事件,格式为ISO-8601
      "eventVersion": "", //版本号,目前为"1.0"
      "oss": {
        "bucket": {
          "arn": "", //bucket的唯一标识符,格式为"acs:oss:region:uid:bucket"
          "name": "", //bucket名称
          "ownerIdentity": "" //bucket的owner
        },
        "object": {
          "deltaSize":, //object大小的变化量,比如新增一个文件,这个值就是文件大小,如果是覆盖一个文件,这个值就是新文件与旧文件的差值,因此可能为负数
          "eTag": "", //object的etag,与GetObject()请求返回的ETag头的内容相同
          "key": "", //object名称
          "position":, //可变项,只有在ObjectCreated:AppendObject事件中才有,表示此次请求开始append的位置,注意是从0开始
          "readFrom":, //可变项,只有在ObjectDownloaded:GetObject事件中才有,表示文件开始读取的位置,如果不是Range请求,则此项为0,否则则是Range请求的开始字节,注意是从0开始
          "readTo":,//可变项,只有在ObjectDownloaded:GetObject事件中才有,表示文件最后读取的位置,如果不是Range请求,则此项为文件的大小,否则则是Range请求的结束字节增1
          "size"://object大小
        }
        "ossSchemaVersion": "", //此字段域的版本号,目前为"1.0"
        "ruleId": "GetObject" //此事件匹配的规则ID
      },
      "region": "", //bucket所在的region
      "requestParameters": {
        "sourceIPAddress": "" //请求的源IP
      },
      "responseElements": {
        "requestId": ""  //请求对应的requestid
      },
      "userIdentity": {
        "principalId": ""  //请求发起者的uid
      },
      "xVars": {
        //oss的callback功能中的自定义参数
        "x:callback-var1": "value1",
        "x:vallback-var2": "value2"
      }
    }
  ]
}

结语

在信息化时代,我们需要跟上时代的步伐,与时俱进。本篇关于oss事件通知的介绍到此结束,希望对家有所帮助。