doc-exports/docs/dli/umn/dli_03_0044.html
Su, Xiaomeng fdd43c552e dli_umn_20240808
Reviewed-by: Pruthi, Vineet <vineet.pruthi@t-systems.com>
Co-authored-by: Su, Xiaomeng <suxiaomeng1@huawei.com>
Co-committed-by: Su, Xiaomeng <suxiaomeng1@huawei.com>
2024-08-09 11:00:57 +00:00

6.2 KiB

Can I Upload Configuration Files for Flink Jar Jobs?

Uploading a Configuration File for a Flink Jar Job

You can upload configuration files for custom jobs (Jar).

  1. Upload the configuration file to DLI through Package Management.
  2. In the Other Dependencies area of the Flink Jar job, select the created DLI package.
  3. Load the file through ClassName.class.getClassLoader().getResource("userData/fileName") in the code. In the file name, fileName indicates the name of the file to be accessed, and ClassName indicates the name of the class that needs to access the file.

Using a Configuration File

  • Solution 1: Load the file content to the memory in the main function and broadcast the content to each taskmanager. This method is applicable to the scenario where a small number of variables need to be loaded in advance.
  • Solution 2: Load the file when initializing the operator in open. A relative or absolute path can be used.

    Take Kafka sink as an example. Two files (userData/kafka-sink.conf and userData/client.truststore.jks) need to be loaded.

    • Example of using a relative path:
      Relative path: confPath = userData/kafka-sink.conf
      @Override
      public void open(Configuration parameters) throws Exception {
          super.open(parameters);
          initConf();
          producer = new KafkaProducer<>(props);
      }
      private void initConf() {
          try {
              URL url = DliFlinkDemoDis2Kafka.class.getClassLoader().getResource(confPath);
              if (url != null) {
                  LOGGER.info("kafka main-url: " + url.getFile());
              } else {
                  LOGGER.info("kafka url error......");
              }
              InputStream inputStream = new BufferedInputStream(new FileInputStream(new File(url.getFile()).getAbsolutePath()));
              props.load(new InputStreamReader(inputStream, "UTF-8"));
              topic = props.getProperty("topic");
              partition = Integer.parseInt(props.getProperty("partition"));
              vaildProps();
          } catch (Exception e) {
              LOGGER.info("load kafka conf failed");
              e.printStackTrace();
          }
      }
      Figure 1 Example relative path configuration
    • Example of using an absolute path:
      Absolute path: confPath = userData/kafka-sink.conf / path = /opt/data1/hadoop/tmp/usercache/omm/appcache/application_xxx_0015/container_xxx_0015_01_000002/userData/client.truststore.jks
      @Override
      public void open(Configuration parameters) throws Exception {
          super.open(parameters);
          initConf();
      	String path = DliFlinkDemoDis2Kafka.class.getClassLoader().getResource("userData/client.truststore.jks").getPath();
      	LOGGER.info("kafka abs path " + path);
      	props.setProperty("ssl.truststore.location", path);
          producer = new KafkaProducer<>(props);
      }
      private void initConf() {
          try {
              URL url = DliFlinkDemoDis2Kafka.class.getClassLoader().getResource(confPath);
              if (url != null) {
                  LOGGER.info("kafka main-url: " + url.getFile());
              } else {
                  LOGGER.info("kafka url error......");
              }
              InputStream inputStream = new BufferedInputStream(new FileInputStream(new File(url.getFile()).getAbsolutePath()));
              props.load(new InputStreamReader(inputStream, "UTF-8"));
              topic = props.getProperty("topic");
              partition = Integer.parseInt(props.getProperty("partition"));
              vaildProps();
          } catch (Exception e) {
              LOGGER.info("load kafka conf failed");
              e.printStackTrace();
          }
      }
      Figure 2 Example absolute path configuration